Unverified Commit ec5ba309 authored by Marc Gravell's avatar Marc Gravell Committed by GitHub

Async write path (#1056)

* start work on a true async write path (no sync flush); at the moment only existing async code uses this - once stable, we can move more code to the full async path

* finish prep work before tackling subscriber lock (note: still lots of handshake options to asyncify)

* implement a background queue that represents the (necessarily ordered) subscription operations, rather than executing them synchronously (sync over async, locks, etc)
parent 89b8f64b
......@@ -164,7 +164,7 @@ private async Task OnMessageSyncImpl()
while (!Completion.IsCompleted)
{
ChannelMessage next;
try { if (!TryRead(out next)) next = await ReadAsync().ConfigureAwait(false); }
try { if (!TryRead(out next)) next = await ReadAsync().ForAwait(); }
catch (ChannelClosedException) { break; } // expected
catch (Exception ex)
{
......@@ -195,7 +195,7 @@ private async Task OnMessageAsyncImpl()
while (!Completion.IsCompleted)
{
ChannelMessage next;
try { if (!TryRead(out next)) next = await ReadAsync().ConfigureAwait(false); }
try { if (!TryRead(out next)) next = await ReadAsync().ForAwait(); }
catch (ChannelClosedException) { break; } // expected
catch (Exception ex)
{
......@@ -206,7 +206,7 @@ private async Task OnMessageAsyncImpl()
try
{
var task = handler(next);
if (task != null && task.Status != TaskStatus.RanToCompletion) await task.ConfigureAwait(false);
if (task != null && task.Status != TaskStatus.RanToCompletion) await task.ForAwait();
}
catch { } // matches MessageCompletable
}
......@@ -229,7 +229,7 @@ internal async Task UnsubscribeAsyncImpl(Exception error = null, CommandFlags fl
_parent = null;
if (parent != null)
{
await parent.UnsubscribeAsync(Channel, HandleMessage, flags).ConfigureAwait(false);
await parent.UnsubscribeAsync(Channel, HandleMessage, flags).ForAwait();
}
_queue.Writer.TryComplete(error);
}
......
using System;
using System.Collections.Generic;
#pragma warning disable RCS1231
namespace StackExchange.Redis
{
/// <summary>
......@@ -303,6 +305,8 @@ public static Condition StringNotEqual(RedisKey key, RedisValue value)
/// <param name="count">The number of members which sorted set must not have.</param>
public static Condition SortedSetScoreNotExists(RedisKey key, RedisValue score, RedisValue count) => new SortedSetScoreCondition(key, score, false, count);
#pragma warning restore RCS1231
internal abstract void CheckCommands(CommandMap commandMap);
internal abstract IEnumerable<Message> CreateMessages(int db, ResultBox resultBox);
......@@ -314,12 +318,14 @@ internal sealed class ConditionProcessor : ResultProcessor<bool>
{
public static readonly ConditionProcessor Default = new ConditionProcessor();
public static Message CreateMessage(Condition condition, int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value = default(RedisValue))
#pragma warning disable RCS1231 // Make parameter ref read-only.
public static Message CreateMessage(Condition condition, int db, CommandFlags flags, RedisCommand command, in RedisKey key, RedisValue value = default(RedisValue))
#pragma warning restore RCS1231 // Make parameter ref read-only.
{
return new ConditionMessage(condition, db, flags, command, key, value);
}
public static Message CreateMessage(Condition condition, int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value, RedisValue value1)
public static Message CreateMessage(Condition condition, int db, CommandFlags flags, RedisCommand command, in RedisKey key, in RedisValue value, in RedisValue value1)
{
return new ConditionMessage(condition, db, flags, command, key, value, value1);
}
......@@ -343,14 +349,14 @@ private class ConditionMessage : Message.CommandKeyBase
private readonly RedisValue value;
private readonly RedisValue value1;
public ConditionMessage(Condition condition, int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value)
public ConditionMessage(Condition condition, int db, CommandFlags flags, RedisCommand command, in RedisKey key, in RedisValue value)
: base(db, flags, command, key)
{
Condition = condition;
this.value = value; // note no assert here
}
public ConditionMessage(Condition condition, int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value, RedisValue value1)
public ConditionMessage(Condition condition, int db, CommandFlags flags, RedisCommand command, in RedisKey key, in RedisValue value, in RedisValue value1)
: this(condition, db, flags, command, key, value)
{
this.value1 = value1; // note no assert here
......@@ -391,7 +397,7 @@ internal override Condition MapKeys(Func<RedisKey, RedisKey> map)
return new ExistsCondition(map(key), type, expectedValue, expectedResult);
}
public ExistsCondition(RedisKey key, RedisType type, RedisValue expectedValue, bool expectedResult)
public ExistsCondition(in RedisKey key, RedisType type, in RedisValue expectedValue, bool expectedResult)
{
if (key.IsNull) throw new ArgumentException("key");
this.key = key;
......@@ -481,7 +487,7 @@ internal override Condition MapKeys(Func<RedisKey, RedisKey> map)
private readonly RedisType type;
private readonly RedisCommand cmd;
public EqualsCondition(RedisKey key, RedisType type, RedisValue memberName, bool expectedEqual, RedisValue expectedValue)
public EqualsCondition(in RedisKey key, RedisType type, in RedisValue memberName, bool expectedEqual, in RedisValue expectedValue)
{
if (key.IsNull) throw new ArgumentException("key");
this.key = key;
......@@ -575,7 +581,7 @@ internal override Condition MapKeys(Func<RedisKey, RedisKey> map)
private readonly long index;
private readonly RedisValue? expectedValue;
private readonly RedisKey key;
public ListCondition(RedisKey key, long index, bool expectedResult, RedisValue? expectedValue)
public ListCondition(in RedisKey key, long index, bool expectedResult, in RedisValue? expectedValue)
{
if (key.IsNull) throw new ArgumentException(nameof(key));
this.key = key;
......@@ -645,7 +651,7 @@ internal override Condition MapKeys(Func<RedisKey, RedisKey> map)
private readonly RedisType type;
private readonly RedisCommand cmd;
public LengthCondition(RedisKey key, RedisType type, int compareToResult, long expectedLength)
public LengthCondition(in RedisKey key, RedisType type, int compareToResult, long expectedLength)
{
if (key.IsNull) throw new ArgumentException(nameof(key));
this.key = key;
......@@ -737,7 +743,7 @@ internal override Condition MapKeys(Func<RedisKey, RedisKey> map)
private readonly RedisValue sortedSetScore, expectedValue;
private readonly RedisKey key;
public SortedSetScoreCondition(RedisKey key, RedisValue sortedSetScore, bool expectedEqual, RedisValue expectedValue)
public SortedSetScoreCondition(in RedisKey key, in RedisValue sortedSetScore, bool expectedEqual, in RedisValue expectedValue)
{
if (key.IsNull)
{
......
......@@ -291,10 +291,13 @@ public int ConnectTimeout
/// </summary>
public bool HighPrioritySocketThreads { get { return highPrioritySocketThreads ?? true; } set { highPrioritySocketThreads = value; } }
// Use coalesce expression.
/// <summary>
/// Specifies the time in seconds at which connections should be pinged to ensure validity
/// </summary>
#pragma warning disable RCS1128
public int KeepAlive { get { return keepAlive.GetValueOrDefault(-1); } set { keepAlive = value; } }
#pragma warning restore RCS1128 // Use coalesce expression.
/// <summary>
/// The password to use to authenticate with the server.
......@@ -363,7 +366,9 @@ public bool PreserveAsyncOrder
/// <summary>
/// Specifies the time in milliseconds that the system should allow for synchronous operations (defaults to 1 second)
/// </summary>
#pragma warning disable RCS1128
public int SyncTimeout { get { return syncTimeout.GetValueOrDefault(5000); } set { syncTimeout = value; } }
#pragma warning restore RCS1128
/// <summary>
/// Tie-breaker used to choose between masters (must match the endpoint exactly)
......@@ -383,7 +388,9 @@ public bool PreserveAsyncOrder
/// <summary>
/// Check configuration every n seconds (every minute by default)
/// </summary>
#pragma warning disable RCS1128
public int ConfigCheckSeconds { get { return configCheckSeconds.GetValueOrDefault(60); } set { configCheckSeconds = value; } }
#pragma warning restore RCS1128
/// <summary>
/// Parse the configuration from a comma-delimited configuration string
......
......@@ -21,8 +21,6 @@ namespace StackExchange.Redis
/// </summary>
public sealed partial class ConnectionMultiplexer : IInternalConnectionMultiplexer // implies : IConnectionMultiplexer and : IDisposable
{
private const string timeoutHelpLink = "https://stackexchange.github.io/StackExchange.Redis/Timeouts";
private static TaskFactory _factory = null;
#if DEBUG
......@@ -105,7 +103,7 @@ private static string GetDefaultClientName()
/// </summary>
internal static string TryGetAzureRoleInstanceIdNoThrow()
{
string roleInstanceId = null;
string roleInstanceId;
// TODO: CoreCLR port pending https://github.com/dotnet/coreclr/issues/919
try
{
......@@ -379,7 +377,9 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
if (!node.IsConnected) continue;
LogLocked(log, "Attempting to set tie-breaker on {0}...", Format.ToString(node.EndPoint));
msg = Message.Create(0, flags, RedisCommand.SET, tieBreakerKey, newMaster);
node.WriteDirectFireAndForget(msg, ResultProcessor.DemandOK);
#pragma warning disable CS0618
node.WriteDirectFireAndForgetSync(msg, ResultProcessor.DemandOK);
#pragma warning restore CS0618
}
}
......@@ -400,7 +400,9 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
{
LogLocked(log, "Resending tie-breaker to {0}...", Format.ToString(server.EndPoint));
msg = Message.Create(0, flags, RedisCommand.SET, tieBreakerKey, newMaster);
server.WriteDirectFireAndForget(msg, ResultProcessor.DemandOK);
#pragma warning disable CS0618
server.WriteDirectFireAndForgetSync(msg, ResultProcessor.DemandOK);
#pragma warning restore CS0618
}
// There's an inherent race here in zero-lantency environments (e.g. when Redis is on localhost) when a broadcast is specified
......@@ -420,7 +422,9 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
if (!node.IsConnected) continue;
LogLocked(log, "Broadcasting via {0}...", Format.ToString(node.EndPoint));
msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, newMaster);
node.WriteDirectFireAndForget(msg, ResultProcessor.Int64);
#pragma warning disable CS0618
node.WriteDirectFireAndForgetSync(msg, ResultProcessor.Int64);
#pragma warning restore CS0618
}
}
......@@ -432,7 +436,9 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
LogLocked(log, "Enslaving {0}...", Format.ToString(node.EndPoint));
msg = RedisServer.CreateSlaveOfMessage(server.EndPoint, flags);
node.WriteDirectFireAndForget(msg, ResultProcessor.DemandOK);
#pragma warning disable CS0618
node.WriteDirectFireAndForgetSync(msg, ResultProcessor.DemandOK);
#pragma warning restore CS0618
}
}
......@@ -690,20 +696,20 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
}
var watch = Stopwatch.StartNew();
LogLockedWithThreadPoolStats(log, "Awaiting task completion", out int busyWorkerCount);
LogLockedWithThreadPoolStats(log, "Awaiting task completion", out _);
try
{
// if none error, great
var remaining = timeoutMilliseconds - checked((int)watch.ElapsedMilliseconds);
if (remaining <= 0)
{
LogLockedWithThreadPoolStats(log, "Timeout before awaiting for tasks", out busyWorkerCount);
LogLockedWithThreadPoolStats(log, "Timeout before awaiting for tasks", out _);
return false;
}
var allTasks = Task.WhenAll(tasks).ObserveErrors();
bool all = await allTasks.TimeoutAfter(timeoutMs: remaining).ObserveErrors().ForAwait();
LogLockedWithThreadPoolStats(log, all ? "All tasks completed cleanly" : $"Not all tasks completed cleanly (from {caller}#{callerLineNumber}, timeout {timeoutMilliseconds}ms)", out busyWorkerCount);
LogLockedWithThreadPoolStats(log, all ? "All tasks completed cleanly" : $"Not all tasks completed cleanly (from {caller}#{callerLineNumber}, timeout {timeoutMilliseconds}ms)", out _);
return all;
}
catch
......@@ -719,7 +725,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
var remaining = timeoutMilliseconds - checked((int)watch.ElapsedMilliseconds);
if (remaining <= 0)
{
LogLockedWithThreadPoolStats(log, "Timeout awaiting tasks", out busyWorkerCount);
LogLockedWithThreadPoolStats(log, "Timeout awaiting tasks", out _);
return false;
}
try
......@@ -730,7 +736,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
{ }
}
}
LogLockedWithThreadPoolStats(log, "Finished awaiting tasks", out busyWorkerCount);
LogLockedWithThreadPoolStats(log, "Finished awaiting tasks", out _);
return false;
}
......@@ -1724,7 +1730,10 @@ private void ResetAllNonConnected()
}
}
#pragma warning disable IDE0060
partial void OnTraceLog(TextWriter log, [CallerMemberName] string caller = null);
#pragma warning restore IDE0060
private async Task<ServerEndPoint> NominatePreferredMaster(TextWriter log, ServerEndPoint[] servers, bool useTieBreakers, Task<string>[] tieBreakers, List<ServerEndPoint> masters)
{
Dictionary<string, int> uniques = null;
......@@ -1901,7 +1910,7 @@ internal ServerEndPoint SelectServer(RedisCommand command, CommandFlags flags, i
return ServerSelectionStrategy.Select(command, key, flags);
}
private WriteResult TryPushMessageToBridge<T>(Message message, ResultProcessor<T> processor, ResultBox<T> resultBox, ref ServerEndPoint server)
private bool PrepareToPushMessageToBridge<T>(Message message, ResultProcessor<T> processor, ResultBox<T> resultBox, ref ServerEndPoint server)
{
message.SetSource(processor, resultBox);
......@@ -1950,11 +1959,19 @@ private WriteResult TryPushMessageToBridge<T>(Message message, ResultProcessor<T
}
Trace("Queueing on server: " + message);
return server.TryWrite(message);
return true;
}
Trace("No server or server unavailable - aborting: " + message);
return WriteResult.NoConnectionAvailable;
return false;
}
private ValueTask<WriteResult> TryPushMessageToBridgeAsync<T>(Message message, ResultProcessor<T> processor, ResultBox<T> resultBox, ref ServerEndPoint server)
=> PrepareToPushMessageToBridge(message, processor, resultBox, ref server) ? server.TryWriteAsync(message) : new ValueTask<WriteResult>(WriteResult.NoConnectionAvailable);
[Obsolete("prefer async")]
#pragma warning disable CS0618
private WriteResult TryPushMessageToBridgeSync<T>(Message message, ResultProcessor<T> processor, ResultBox<T> resultBox, ref ServerEndPoint server)
=> PrepareToPushMessageToBridge(message, processor, resultBox, ref server) ? server.TryWriteSync(message) : WriteResult.NoConnectionAvailable;
#pragma warning restore CS0618
/// <summary>
/// See Object.ToString()
......@@ -2111,14 +2128,18 @@ internal Task<T> ExecuteAsyncImpl<T>(Message message, ResultProcessor<T> process
if (message.IsFireAndForget)
{
TryPushMessageToBridge(message, processor, null, ref server);
TryPushMessageToBridgeAsync(message, processor, null, ref server);
return CompletedTask<T>.Default(null); // F+F explicitly does not get async-state
}
else
{
var tcs = TaskSource.Create<T>(state);
var source = ResultBox<T>.Get(tcs);
var result = TryPushMessageToBridge(message, processor, source, ref server);
var write = TryPushMessageToBridgeAsync(message, processor, source, ref server);
if (!write.IsCompletedSuccessfully) return ExecuteAsyncImpl_Awaited<T>(this, write, tcs, message, server);
var result = write.Result;
if (result != WriteResult.Success)
{
var ex = GetException(result, message, server);
......@@ -2127,6 +2148,18 @@ internal Task<T> ExecuteAsyncImpl<T>(Message message, ResultProcessor<T> process
return tcs.Task;
}
}
private static async Task<T> ExecuteAsyncImpl_Awaited<T>(ConnectionMultiplexer @this, ValueTask<WriteResult> write, TaskCompletionSource<T> tcs, Message message, ServerEndPoint server)
{
var result = await write.ForAwait();
if (result != WriteResult.Success)
{
var ex = @this.GetException(result, message, server);
ThrowFailed(tcs, ex);
}
return await tcs.Task.ForAwait();
}
internal Exception GetException(WriteResult result, Message message, ServerEndPoint server)
{
switch (result)
......@@ -2187,7 +2220,9 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
if (message.IsFireAndForget)
{
TryPushMessageToBridge(message, processor, null, ref server);
#pragma warning disable CS0618
TryPushMessageToBridgeSync(message, processor, null, ref server);
#pragma warning restore CS0618
Interlocked.Increment(ref fireAndForgets);
return default(T);
}
......@@ -2197,7 +2232,9 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
lock (source)
{
var result = TryPushMessageToBridge(message, processor, source, ref server);
#pragma warning disable CS0618
var result = TryPushMessageToBridgeSync(message, processor, source, ref server);
#pragma warning restore CS0618
if (result != WriteResult.Success)
{
throw GetException(result, message, server);
......
......@@ -71,12 +71,12 @@ private async Task CloneAsync(string path, PipeReader from, PipeWriter to)
arr = new ArraySegment<byte>(tmp, 0, segment.Length);
leased = true;
}
await file.WriteAsync(arr.Array, arr.Offset, arr.Count);
await file.FlushAsync();
await file.WriteAsync(arr.Array, arr.Offset, arr.Count).ForAwait();
await file.FlushAsync().ForAwait();
if (leased) ArrayPool<byte>.Shared.Return(arr.Array);
// and flush it upstream
await to.WriteAsync(segment);
await to.WriteAsync(segment).ForAwait();
}
}
from.AdvanceTo(buffer.End);
......
......@@ -5,7 +5,9 @@
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using PendingSubscriptionState = global::StackExchange.Redis.ConnectionMultiplexer.Subscription.PendingSubscriptionState;
namespace StackExchange.Redis
{
......@@ -92,6 +94,7 @@ public long SubscriptionCount
public void Dispose()
{
isDisposed = true;
ShutdownSubscriptionQueue();
using (var tmp = physical)
{
physical = null;
......@@ -123,45 +126,64 @@ public void ReportNextFailure()
public void TryConnect(TextWriter log) => GetConnection(log);
public WriteResult TryWrite(Message message, bool isSlave)
private WriteResult QueueOrFailMessage(Message message)
{
if (isDisposed) throw new ObjectDisposedException(Name);
if (!IsConnected)
if (message.IsInternalCall && message.Command != RedisCommand.QUIT)
{
if (message.IsInternalCall && message.Command != RedisCommand.QUIT)
{
// you can go in the queue, but we won't be starting
// a worker, because the handshake has not completed
var queue = _preconnectBacklog;
lock (queue)
{
queue.Enqueue(message);
}
message.SetEnqueued(null);
return WriteResult.Success; // we'll take it...
}
else
// you can go in the queue, but we won't be starting
// a worker, because the handshake has not completed
var queue = _preconnectBacklog;
lock (queue)
{
// sorry, we're just not ready for you yet;
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message);
return WriteResult.NoConnectionAvailable;
queue.Enqueue(message);
}
message.SetEnqueued(null);
return WriteResult.Success; // we'll take it...
}
var physical = this.physical;
if (physical == null)
else
{
// sorry, we're just not ready for you yet;
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message);
return WriteResult.NoConnectionAvailable;
}
}
private WriteResult FailDueToNoConnection(Message message)
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message);
return WriteResult.NoConnectionAvailable;
}
[Obsolete("prefer async")]
public WriteResult TryWriteSync(Message message, bool isSlave)
{
if (isDisposed) throw new ObjectDisposedException(Name);
if (!IsConnected) return QueueOrFailMessage(message);
var result = WriteMessageTakingWriteLock(physical, message);
var physical = this.physical;
if (physical == null) return FailDueToNoConnection(message);
#pragma warning disable CS0618
var result = WriteMessageTakingWriteLockSync(physical, message);
#pragma warning restore CS0618
LogNonPreferred(message.Flags, isSlave);
return result;
}
public ValueTask<WriteResult> TryWriteAsync(Message message, bool isSlave)
{
if (isDisposed) throw new ObjectDisposedException(Name);
if (!IsConnected) return new ValueTask<WriteResult>(QueueOrFailMessage(message));
var physical = this.physical;
if (physical == null) return new ValueTask<WriteResult>(FailDueToNoConnection(message));
var result = WriteMessageTakingWriteLockAsync(physical, message);
LogNonPreferred(message.Flags, isSlave);
return result;
}
......@@ -200,6 +222,70 @@ internal void GetCounters(ConnectionCounters counters)
physical?.GetCounters(counters);
}
private Channel<PendingSubscriptionState> _subscriptionBackgroundQueue;
private static readonly UnboundedChannelOptions s_subscriptionQueueOptions = new UnboundedChannelOptions
{
AllowSynchronousContinuations = false, // we do *not* want the async work to end up on the caller's thread
SingleReader = true, // only one reader will be started per channel
SingleWriter = true, // writes will be synchronized, because order matters
};
private Channel<PendingSubscriptionState> GetSubscriptionQueue()
{
var queue = _subscriptionBackgroundQueue;
if (queue == null)
{
queue = Channel.CreateUnbounded<PendingSubscriptionState>(s_subscriptionQueueOptions);
var existing = Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, queue, null);
if (existing != null) return existing; // we didn't win, but that's fine
// we won (_subqueue is now queue)
// this means we have a new channel without a reader; let's fix that!
Task.Run(() => ExecuteSubscriptionLoop());
}
return queue;
}
private void ShutdownSubscriptionQueue()
{
try
{
Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, null, null)?.Writer.TryComplete();
}
catch { }
}
private async Task ExecuteSubscriptionLoop() // pushes items that have been enqueued over the bridge
{
// note: this will execute on the default pool rather than our dedicated pool; I'm... OK with this
var queue = _subscriptionBackgroundQueue ?? Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, null, null); // just to be sure we can read it!
try
{
while (await queue.Reader.WaitToReadAsync().ForAwait() && queue.Reader.TryRead(out var next))
{
try
{
if ((await TryWriteAsync(next.Message, next.IsSlave).ForAwait()) != WriteResult.Success)
{
next.Abort();
}
}
catch (Exception ex)
{
next.Fail(ex);
}
}
}
catch (Exception ex)
{
Multiplexer.OnInternalError(ex, ServerEndPoint?.EndPoint, ConnectionType);
}
}
internal bool TryEnqueueBackgroundSubscriptionWrite(PendingSubscriptionState state)
=> isDisposed ? false : (_subscriptionBackgroundQueue ?? GetSubscriptionQueue()).Writer.TryWrite(state);
internal void GetOutstandingCount(out int inst, out int qs, out int @in)
{// defined as: PendingUnsentItems + SentItemsAwaitingResponse + ResponsesAwaitingAsyncCompletion
inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog));
......@@ -267,7 +353,9 @@ internal void KeepAlive()
Multiplexer.Trace("Enqueue: " + msg);
Multiplexer.OnInfoMessage($"heartbeat ({physical?.LastWriteSecondsAgo}s >= {ServerEndPoint?.WriteEverySeconds}s, {physical?.GetSentAwaitingResponseCount()} waiting) '{msg.CommandAndKey}' on '{PhysicalName}' (v{features.Version})");
physical?.UpdateLastWriteTime(); // pre-emptively
var result = TryWrite(msg, ServerEndPoint.IsSlave);
#pragma warning disable CS0618
var result = TryWriteSync(msg, ServerEndPoint.IsSlave);
#pragma warning restore CS0618
if (result != WriteResult.Success)
{
......@@ -350,7 +438,9 @@ private Message DequeueNextPendingBacklog()
return _preconnectBacklog.Count == 0 ? null : _preconnectBacklog.Dequeue();
}
}
private void WritePendingBacklog(PhysicalConnection connection)
[Obsolete("prefer async")]
private void WritePendingBacklogSync(PhysicalConnection connection)
{
if (connection != null)
{
......@@ -358,7 +448,9 @@ private void WritePendingBacklog(PhysicalConnection connection)
do
{
next = DequeueNextPendingBacklog();
if (next != null) WriteMessageTakingWriteLock(connection, next);
#pragma warning disable CS0618
if (next != null) WriteMessageTakingWriteLockSync(connection, next);
#pragma warning restore CS0618
} while (next != null);
}
}
......@@ -385,7 +477,9 @@ internal void OnFullyEstablished(PhysicalConnection connection)
LastException = null;
Interlocked.Exchange(ref failConnectCount, 0);
ServerEndPoint.OnFullyEstablished(connection);
WritePendingBacklog(connection);
#pragma warning disable CS0618
WritePendingBacklogSync(connection);
#pragma warning restore CS0618
if (ConnectionType == ConnectionType.Interactive) ServerEndPoint.CheckInfoReplication();
}
......@@ -532,30 +626,69 @@ internal bool TryEnqueue(List<Message> messages, bool isSlave)
{ // deliberately not taking a single lock here; we don't care if
// other threads manage to interleave - in fact, it would be desirable
// (to avoid a batch monopolising the connection)
WriteMessageTakingWriteLock(physical, message);
#pragma warning disable CS0618
WriteMessageTakingWriteLockSync(physical, message);
#pragma warning restore CS0618
LogNonPreferred(message.Flags, isSlave);
}
return true;
}
private readonly object SingleWriterLock = new object();
private readonly SemaphoreSlim _SingleWriterLock = new SemaphoreSlim(1);
private Message _activeMesssage;
/// <summary>
/// This writes a message to the output stream
/// </summary>
/// <param name="physical">The phsyical connection to write to.</param>
/// <param name="message">The message to be written.</param>
internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Message message)
{
Trace("Writing: " + message);
message.SetEnqueued(physical); // this also records the read/write stats at this point
private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message message)
{
WriteResult result;
var existingMessage = Interlocked.CompareExchange(ref _activeMesssage, message, null);
if (existingMessage != null)
{
Multiplexer?.OnInfoMessage($"reentrant call to WriteMessageTakingWriteLock for {message.CommandAndKey}, {existingMessage.CommandAndKey} is still active");
return WriteResult.NoConnectionAvailable;
}
physical.SetWriting();
var messageIsSent = false;
if (message is IMultiMessage)
{
SelectDatabaseInsideWriteLock(physical, message); // need to switch database *before* the transaction
foreach (var subCommand in ((IMultiMessage)message).GetMessages(physical))
{
result = WriteMessageToServerInsideWriteLock(physical, subCommand);
if (result != WriteResult.Success)
{
// we screwed up; abort; note that WriteMessageToServer already
// killed the underlying connection
Trace("Unable to write to server");
message.Fail(ConnectionFailureType.ProtocolFailure, null, "failure before write: " + result.ToString());
this.CompleteSyncOrAsync(message);
return result;
}
//The parent message (next) may be returned from GetMessages
//and should not be marked as sent again below
messageIsSent = messageIsSent || subCommand == message;
}
if (!messageIsSent)
{
message.SetRequestSent(); // well, it was attempted, at least...
}
return WriteResult.Success;
}
else
{
return WriteMessageToServerInsideWriteLock(physical, message);
}
}
private async Task<WriteResult> WriteMessageTakingDelayedWriteLockAsync(PhysicalConnection physical, Message message)
{
bool haveLock = false;
try
{
Monitor.TryEnter(SingleWriterLock, TimeoutMilliseconds, ref haveLock);
// WriteMessageTakingWriteLockAsync will have checked for immediate availability,
// so this is the fallback case - fine to go straight to "await"
haveLock = await _SingleWriterLock.WaitAsync(TimeoutMilliseconds).ForAwait();
if (!haveLock)
{
message.Cancel();
......@@ -564,67 +697,110 @@ internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Me
return WriteResult.TimeoutBeforeWrite;
}
var existingMessage = Interlocked.CompareExchange(ref _activeMesssage, message, null);
if (existingMessage != null)
var result = WriteMessageInsideLock(physical, message);
if (result == WriteResult.Success)
{
Multiplexer?.OnInfoMessage($"reentrant call to WriteMessageTakingWriteLock for {message.CommandAndKey}, {existingMessage.CommandAndKey} is still active");
return WriteResult.NoConnectionAvailable;
result = await physical.FlushAsync(false).ForAwait();
}
physical.SetWriting();
var messageIsSent = false;
if (message is IMultiMessage)
{
SelectDatabaseInsideWriteLock(physical, message); // need to switch database *before* the transaction
foreach (var subCommand in ((IMultiMessage)message).GetMessages(physical))
{
result = WriteMessageToServerInsideWriteLock(physical, subCommand);
if (result != WriteResult.Success)
{
// we screwed up; abort; note that WriteMessageToServer already
// killed the underlying connection
Trace("Unable to write to server");
message.Fail(ConnectionFailureType.ProtocolFailure, null, "failure before write: " + result.ToString());
this.CompleteSyncOrAsync(message);
return result;
}
//The parent message (next) may be returned from GetMessages
//and should not be marked as sent again below
messageIsSent = messageIsSent || subCommand == message;
}
if (!messageIsSent)
{
message.SetRequestSent(); // well, it was attempted, at least...
}
result = WriteResult.Success;
}
else
physical.SetIdle();
return result;
}
catch (Exception ex) { return HandleWriteException(message, ex); }
finally { if (haveLock) ReleaseSingleWriterLock(message); }
}
[Obsolete("prefer async")]
internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical, Message message)
{
Trace("Writing: " + message);
message.SetEnqueued(physical); // this also records the read/write stats at this point
bool haveLock = false;
try
{
haveLock = _SingleWriterLock.Wait(TimeoutMilliseconds);
if (!haveLock)
{
result = WriteMessageToServerInsideWriteLock(physical, message);
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
this.CompleteSyncOrAsync(message);
return WriteResult.TimeoutBeforeWrite;
}
var result = WriteMessageInsideLock(physical, message);
if (result == WriteResult.Success)
{
result = physical.FlushSync();
#pragma warning disable CS0618
result = physical.FlushSync(false, TimeoutMilliseconds);
#pragma warning restore CS0618
}
physical.SetIdle();
return result;
}
catch (Exception ex)
{
var inner = new RedisConnectionException(ConnectionFailureType.InternalFailure, "Failed to write", ex);
message.SetExceptionAndComplete(inner, this);
result = WriteResult.WriteFailure;
}
finally
catch (Exception ex) { return HandleWriteException(message, ex); }
finally { if (haveLock) ReleaseSingleWriterLock(message); }
}
/// <summary>
/// This writes a message to the output stream
/// </summary>
/// <param name="physical">The phsyical connection to write to.</param>
/// <param name="message">The message to be written.</param>
internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnection physical, Message message)
{
Trace("Writing: " + message);
message.SetEnqueued(physical); // this also records the read/write stats at this point
bool haveLock = false;
try
{
if (haveLock)
// try to acquire it synchronously
haveLock = _SingleWriterLock.Wait(0);
if (!haveLock) return new ValueTask<WriteResult>(WriteMessageTakingDelayedWriteLockAsync(physical, message));
var result = WriteMessageInsideLock(physical, message);
if (result == WriteResult.Success)
{
Interlocked.CompareExchange(ref _activeMesssage, null, message); // remove if it is us
Monitor.Exit(SingleWriterLock);
var flush = physical.FlushAsync(false);
if (!flush.IsCompletedSuccessfully)
{
haveLock = false; // so we don't release prematurely
return new ValueTask<WriteResult>(CompleteWriteAndReleaseLockAsync(flush, message));
}
result = flush.Result; // we know it was completed, this is fine
}
physical.SetIdle();
return new ValueTask<WriteResult>(result);
}
catch (Exception ex) { return new ValueTask<WriteResult>(HandleWriteException(message, ex)); }
finally { if (haveLock) ReleaseSingleWriterLock(message); }
}
return result;
private async Task<WriteResult> CompleteWriteAndReleaseLockAsync(ValueTask<WriteResult> flush, Message message)
{
try { return await flush.ForAwait(); }
catch (Exception ex) { return HandleWriteException(message, ex); }
finally { ReleaseSingleWriterLock(message); }
}
private WriteResult HandleWriteException(Message message, Exception ex)
{
var inner = new RedisConnectionException(ConnectionFailureType.InternalFailure, "Failed to write", ex);
message.SetExceptionAndComplete(inner, this);
return WriteResult.WriteFailure;
}
private void ReleaseSingleWriterLock(Message message)
{
Interlocked.CompareExchange(ref _activeMesssage, null, message); // remove if it is us
_SingleWriterLock.Release();
}
private State ChangeState(State newState)
......
......@@ -232,8 +232,6 @@ private enum ReadMode : byte
public bool TransactionActive { get; internal set; }
partial void ShouldIgnoreConnect(ref bool ignore);
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")]
internal void Shutdown()
{
......@@ -275,7 +273,7 @@ public void Dispose()
private async Task AwaitedFlush(ValueTask<FlushResult> flush)
{
await flush;
await flush.ForAwait();
_writeStatus = WriteStatus.Flushed;
UpdateLastWriteTime();
}
......@@ -839,23 +837,50 @@ internal static int WriteRaw(Span<byte> span, long value, bool withLengthPrefix
return WriteCrlf(span, offset);
}
internal WriteResult FlushSync(bool throwOnFailure = false)
private static async ValueTask<WriteResult> FlushAsync_Awaited(PhysicalConnection connection, ValueTask<FlushResult> flush, bool throwOnFailure)
{
try
{
await flush.ForAwait();
connection._writeStatus = WriteStatus.Flushed;
connection.UpdateLastWriteTime();
return WriteResult.Success;
}
catch (ConnectionResetException ex) when (!throwOnFailure)
{
connection.RecordConnectionFailed(ConnectionFailureType.SocketClosed, ex);
return WriteResult.WriteFailure;
}
}
[Obsolete("this is an anti-pattern; work to reduce reliance on this is in progress")]
internal WriteResult FlushSync(bool throwOnFailure, int millisecondsTimeout)
{
var flush = FlushAsync(throwOnFailure);
if (!flush.IsCompletedSuccessfully)
{
// here lies the evil
if (!flush.AsTask().Wait(millisecondsTimeout)) throw new TimeoutException("timeout while synchronously flushing");
}
return flush.Result;
}
internal ValueTask<WriteResult> FlushAsync(bool throwOnFailure)
{
var tmp = _ioPipe?.Output;
if (tmp == null) return WriteResult.NoConnectionAvailable;
if (tmp == null) return new ValueTask<WriteResult>(WriteResult.NoConnectionAvailable);
try
{
_writeStatus = WriteStatus.Flushing;
var flush = tmp.FlushAsync();
if (!flush.IsCompletedSuccessfully) flush.AsTask().Wait();
if (!flush.IsCompletedSuccessfully) return FlushAsync_Awaited(this, flush, throwOnFailure);
_writeStatus = WriteStatus.Flushed;
UpdateLastWriteTime();
return WriteResult.Success;
return new ValueTask<WriteResult>(WriteResult.Success);
}
catch (ConnectionResetException ex) when (!throwOnFailure)
{
RecordConnectionFailed(ConnectionFailureType.SocketClosed, ex);
return WriteResult.WriteFailure;
return new ValueTask<WriteResult>(WriteResult.WriteFailure);
}
}
......@@ -874,7 +899,9 @@ private static void WriteUnifiedBlob(PipeWriter writer, byte[] value)
}
}
#pragma warning disable RCS1231 // Make parameter ref read-only.
private static void WriteUnifiedSpan(PipeWriter writer, ReadOnlySpan<byte> value)
#pragma warning restore RCS1231 // Make parameter ref read-only.
{
// ${len}\r\n = 3 + MaxInt32TextLen
// {value}\r\n = 2 + value.Length
......@@ -949,8 +976,8 @@ internal void WriteSha1AsHex(byte[] value)
for (int i = 0; i < value.Length; i++)
{
var b = value[i];
span[offset++] = ToHexNibble(value[i] >> 4);
span[offset++] = ToHexNibble(value[i] & 15);
span[offset++] = ToHexNibble(b >> 4);
span[offset++] = ToHexNibble(b & 15);
}
span[offset++] = (byte)'\r';
span[offset++] = (byte)'\n';
......@@ -1556,7 +1583,9 @@ private static RawResult ParseInlineProtocol(in RawResult line)
if (!line.HasValue) return RawResult.Nil; // incomplete line
int count = 0;
foreach (var token in line.GetInlineTokenizer()) count++;
#pragma warning disable IDE0059
foreach (var _ in line.GetInlineTokenizer()) count++;
#pragma warning restore IDE0059
var oversized = ArrayPool<RawResult>.Shared.Rent(count);
count = 0;
foreach (var token in line.GetInlineTokenizer())
......
......@@ -54,7 +54,7 @@ internal virtual T ExecuteSync<T>(Message message, ResultProcessor<T> processor,
return multiplexer.ExecuteSyncImpl<T>(message, processor, server);
}
internal virtual RedisFeatures GetFeatures(RedisKey key, CommandFlags flags, out ServerEndPoint server)
internal virtual RedisFeatures GetFeatures(in RedisKey key, CommandFlags flags, out ServerEndPoint server)
{
server = multiplexer.SelectServer(RedisCommand.PING, flags, key);
var version = server == null ? multiplexer.RawConfig.DefaultVersion : server.Version;
......@@ -116,7 +116,7 @@ private ResultProcessor.TimingProcessor.TimerMessage GetTimerMessage(CommandFlag
internal static class CursorUtils
{
internal const int Origin = 0, DefaultPageSize = 10;
internal static bool IsNil(RedisValue pattern)
internal static bool IsNil(in RedisValue pattern)
{
if (pattern.IsNullOrEmpty) return true;
if (pattern.IsInteger) return false;
......@@ -231,7 +231,7 @@ private enum State : byte
Disposed,
}
private void ProcessReply(ScanResult result)
private void ProcessReply(in ScanResult result)
{
currentCursor = nextCursor;
nextCursor = result.Cursor;
......
......@@ -7,6 +7,8 @@
using System.Text;
using System.Threading.Tasks;
#pragma warning disable RCS1231 // Make parameter ref read-only.
namespace StackExchange.Redis
{
internal sealed class RedisServer : RedisBase, IServer
......@@ -583,7 +585,7 @@ internal override T ExecuteSync<T>(Message message, ResultProcessor<T> processor
return base.ExecuteSync<T>(message, processor, server);
}
internal override RedisFeatures GetFeatures(RedisKey key, CommandFlags flags, out ServerEndPoint server)
internal override RedisFeatures GetFeatures(in RedisKey key, CommandFlags flags, out ServerEndPoint server)
{
server = this.server;
return new RedisFeatures(server.Version);
......@@ -607,7 +609,9 @@ public void SlaveOf(EndPoint master, CommandFlags flags = CommandFlags.None)
{
var del = Message.Create(0, CommandFlags.FireAndForget | CommandFlags.NoRedirect, RedisCommand.DEL, (RedisKey)configuration.TieBreaker);
del.SetInternalCall();
server.WriteDirectFireAndForget(del, ResultProcessor.Boolean);
#pragma warning disable CS0618
server.WriteDirectFireAndForgetSync(del, ResultProcessor.Boolean);
#pragma warning restore CS0618
}
ExecuteSync(slaveofMsg, ResultProcessor.DemandOK);
......@@ -617,7 +621,9 @@ public void SlaveOf(EndPoint master, CommandFlags flags = CommandFlags.None)
{
var pub = Message.Create(-1, CommandFlags.FireAndForget | CommandFlags.NoRedirect, RedisCommand.PUBLISH, (RedisValue)channel, RedisLiterals.Wildcard);
pub.SetInternalCall();
server.WriteDirectFireAndForget(pub, ResultProcessor.Int64);
#pragma warning disable CS0618
server.WriteDirectFireAndForgetSync(pub, ResultProcessor.Int64);
#pragma warning restore CS0618
}
}
......
......@@ -127,7 +127,7 @@ internal void ResendSubscriptions(ServerEndPoint server)
}
}
internal bool SubscriberConnected(RedisChannel channel = default(RedisChannel))
internal bool SubscriberConnected(in RedisChannel channel = default(RedisChannel))
{
var server = GetSubscribedServer(channel);
if (server != null) return server.IsConnected;
......@@ -149,7 +149,7 @@ internal long ValidateSubscriptions()
}
}
private sealed class Subscription
internal sealed class Subscription
{
private Action<RedisChannel, RedisValue> _asyncHandler, _syncHandler;
private ServerEndPoint owner;
......@@ -195,25 +195,79 @@ public bool Remove(bool asAsync, Action<RedisChannel, RedisValue> value)
public Task SubscribeToServer(ConnectionMultiplexer multiplexer, in RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall)
{
var cmd = channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE;
var selected = multiplexer.SelectServer(cmd, flags, default(RedisKey));
var selected = multiplexer.SelectServer(RedisCommand.SUBSCRIBE, flags, default(RedisKey));
var bridge = selected?.GetBridge(ConnectionType.Subscription, true);
if (bridge == null) return null;
if (selected == null || Interlocked.CompareExchange(ref owner, selected, null) != null) return null;
// note: check we can create the message validly *before* we swap the owner over (Interlocked)
var state = PendingSubscriptionState.Create(channel, this, flags, true, internalCall, asyncState, selected.IsSlave);
var msg = Message.Create(-1, flags, cmd, channel);
if (internalCall) msg.SetInternalCall();
return selected.WriteDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState);
if (Interlocked.CompareExchange(ref owner, selected, null) != null) return null;
try
{
if (!bridge.TryEnqueueBackgroundSubscriptionWrite(state))
{
state.Abort();
return null;
}
return state.Task;
}
catch
{
// clear the owner if it is still us
Interlocked.CompareExchange(ref owner, null, selected);
throw;
}
}
public Task UnsubscribeFromServer(in RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall)
{
var oldOwner = Interlocked.Exchange(ref owner, null);
if (oldOwner == null) return null;
var bridge = oldOwner?.GetBridge(ConnectionType.Subscription, false);
if (bridge == null) return null;
var state = PendingSubscriptionState.Create(channel, this, flags, false, internalCall, asyncState, oldOwner.IsSlave);
var cmd = channel.IsPatternBased ? RedisCommand.PUNSUBSCRIBE : RedisCommand.UNSUBSCRIBE;
var msg = Message.Create(-1, flags, cmd, channel);
if (internalCall) msg.SetInternalCall();
return oldOwner.WriteDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState);
if (!bridge.TryEnqueueBackgroundSubscriptionWrite(state))
{
state.Abort();
return null;
}
return state.Task;
}
internal readonly struct PendingSubscriptionState
{
public override string ToString() => Message.ToString();
public Subscription Subscription { get; }
public Message Message { get; }
public bool IsSlave { get; }
public Task Task => _taskSource.Task;
private readonly TaskCompletionSource<bool> _taskSource;
public static PendingSubscriptionState Create(RedisChannel channel, Subscription subscription, CommandFlags flags, bool subscribe, bool internalCall, object asyncState, bool isSlave)
=> new PendingSubscriptionState(asyncState, channel, subscription, flags, subscribe, internalCall, isSlave);
public void Abort() => _taskSource.TrySetCanceled();
public void Fail(Exception ex) => _taskSource.TrySetException(ex);
private PendingSubscriptionState(object asyncState, RedisChannel channel, Subscription subscription, CommandFlags flags, bool subscribe, bool internalCall, bool isSlave)
{
var cmd = subscribe
? (channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE)
: (channel.IsPatternBased ? RedisCommand.PUNSUBSCRIBE : RedisCommand.UNSUBSCRIBE);
var msg = Message.Create(-1, flags, cmd, channel);
if (internalCall) msg.SetInternalCall();
var taskSource = TaskSource.Create<bool>(asyncState);
var source = ResultBox<bool>.Get(taskSource);
msg.SetSource(ResultProcessor.TrackSubscriptions, source);
Subscription = subscription;
_taskSource = taskSource;
Message = msg;
IsSlave = isSlave;
}
}
internal ServerEndPoint GetOwner() => Volatile.Read(ref owner);
......@@ -225,7 +279,9 @@ internal void Resubscribe(in RedisChannel channel, ServerEndPoint server)
var cmd = channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE;
var msg = Message.Create(-1, CommandFlags.FireAndForget, cmd, channel);
msg.SetInternalCall();
server.WriteDirectFireAndForget(msg, ResultProcessor.TrackSubscriptions);
#pragma warning disable CS0618
server.WriteDirectFireAndForgetSync(msg, ResultProcessor.TrackSubscriptions);
#pragma warning restore CS0618
}
}
......
......@@ -287,7 +287,9 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
sb.AppendLine("checking conditions in the *early* path");
// need to get those sent ASAP; if they are stuck in the buffers, we die
multiplexer.Trace("Flushing and waiting for precondition responses");
connection.FlushSync(true); // make sure they get sent, so we can check for QUEUED (and the pre-conditions if necessary)
#pragma warning disable CS0618
connection.FlushSync(true, multiplexer.TimeoutMilliseconds); // make sure they get sent, so we can check for QUEUED (and the pre-conditions if necessary)
#pragma warning restore CS0618
if (Monitor.Wait(lastBox, multiplexer.TimeoutMilliseconds))
{
......@@ -344,7 +346,9 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
sb.AppendLine("checking conditions in the *late* path");
multiplexer.Trace("Flushing and waiting for precondition+queued responses");
connection.FlushSync(true); // make sure they get sent, so we can check for QUEUED (and the pre-conditions if necessary)
#pragma warning disable CS0618
connection.FlushSync(true, multiplexer.TimeoutMilliseconds); // make sure they get sent, so we can check for QUEUED (and the pre-conditions if necessary)
#pragma warning restore CS0618
if (Monitor.Wait(lastBox, multiplexer.TimeoutMilliseconds))
{
if (!AreAllConditionsSatisfied(multiplexer))
......
......@@ -488,8 +488,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
{
hash = ParseSHA1(asciiHash); // external caller wants the hex bytes, not the ascii bytes
}
var sl = message as RedisDatabase.ScriptLoadMessage;
if (sl != null)
if (message is RedisDatabase.ScriptLoadMessage sl)
{
connection.BridgeCouldBeNull?.ServerEndPoint?.AddScript(sl.Script, asciiHash);
}
......@@ -1413,7 +1412,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
return false;
}
StreamEntry[] entries = null;
StreamEntry[] entries;
if (skipStreamName)
{
......
......@@ -231,7 +231,10 @@ public void SetUnselectable(UnselectableFlags flags)
public override string ToString() => Format.ToString(EndPoint);
public WriteResult TryWrite(Message message) => GetBridge(message.Command)?.TryWrite(message, isSlave) ?? WriteResult.NoConnectionAvailable;
[Obsolete("prefer async")]
public WriteResult TryWriteSync(Message message) => GetBridge(message.Command)?.TryWriteSync(message, isSlave) ?? WriteResult.NoConnectionAvailable;
public ValueTask<WriteResult> TryWriteAsync(Message message) => GetBridge(message.Command)?.TryWriteAsync(message, isSlave) ?? new ValueTask<WriteResult>(WriteResult.NoConnectionAvailable);
internal void Activate(ConnectionType type, TextWriter log)
{
......@@ -262,20 +265,21 @@ internal void AutoConfigure(PhysicalConnection connection)
var features = GetFeatures();
Message msg;
#pragma warning disable CS0618
if (commandMap.IsAvailable(RedisCommand.CONFIG))
{
if (Multiplexer.RawConfig.KeepAlive <= 0)
{
msg = Message.Create(-1, flags, RedisCommand.CONFIG, RedisLiterals.GET, RedisLiterals.timeout);
msg.SetInternalCall();
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.AutoConfigure);
WriteDirectOrQueueFireAndForgetSync(connection, msg, ResultProcessor.AutoConfigure);
}
msg = Message.Create(-1, flags, RedisCommand.CONFIG, RedisLiterals.GET, RedisLiterals.slave_read_only);
msg.SetInternalCall();
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.AutoConfigure);
WriteDirectOrQueueFireAndForgetSync(connection, msg, ResultProcessor.AutoConfigure);
msg = Message.Create(-1, flags, RedisCommand.CONFIG, RedisLiterals.GET, RedisLiterals.databases);
msg.SetInternalCall();
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.AutoConfigure);
WriteDirectOrQueueFireAndForgetSync(connection, msg, ResultProcessor.AutoConfigure);
}
if (commandMap.IsAvailable(RedisCommand.INFO))
{
......@@ -284,17 +288,17 @@ internal void AutoConfigure(PhysicalConnection connection)
{
msg = Message.Create(-1, flags, RedisCommand.INFO, RedisLiterals.replication);
msg.SetInternalCall();
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.AutoConfigure);
WriteDirectOrQueueFireAndForgetSync(connection, msg, ResultProcessor.AutoConfigure);
msg = Message.Create(-1, flags, RedisCommand.INFO, RedisLiterals.server);
msg.SetInternalCall();
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.AutoConfigure);
WriteDirectOrQueueFireAndForgetSync(connection, msg, ResultProcessor.AutoConfigure);
}
else
{
msg = Message.Create(-1, flags, RedisCommand.INFO);
msg.SetInternalCall();
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.AutoConfigure);
WriteDirectOrQueueFireAndForgetSync(connection, msg, ResultProcessor.AutoConfigure);
}
}
else if (commandMap.IsAvailable(RedisCommand.SET))
......@@ -303,14 +307,15 @@ internal void AutoConfigure(PhysicalConnection connection)
RedisKey key = Multiplexer.UniqueId;
msg = Message.Create(0, flags, RedisCommand.SET, key, RedisLiterals.slave_read_only, RedisLiterals.PX, 1, RedisLiterals.NX);
msg.SetInternalCall();
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.AutoConfigure);
WriteDirectOrQueueFireAndForgetSync(connection, msg, ResultProcessor.AutoConfigure);
}
if (commandMap.IsAvailable(RedisCommand.CLUSTER))
{
msg = Message.Create(-1, flags, RedisCommand.CLUSTER, RedisLiterals.NODES);
msg.SetInternalCall();
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.ClusterNodes);
WriteDirectOrQueueFireAndForgetSync(connection, msg, ResultProcessor.ClusterNodes);
}
#pragma warning restore CS0618
}
private int _nextReplicaOffset;
......@@ -377,7 +382,10 @@ internal void GetOutstandingCount(RedisCommand command, out int inst, out int qs
{
inst = qs = @in = 0;
}
bridge.GetOutstandingCount(out inst, out qs, out @in);
else
{
bridge.GetOutstandingCount(out inst, out qs, out @in);
}
}
internal string GetProfile()
......@@ -521,9 +529,9 @@ internal bool CheckInfoReplication()
{
#pragma warning disable CS0618
var msg = Message.Create(-1, CommandFlags.FireAndForget | CommandFlags.HighPriority | CommandFlags.NoRedirect, RedisCommand.INFO, RedisLiterals.replication);
#pragma warning restore CS0618
msg.SetInternalCall();
WriteDirectFireAndForget(msg, ResultProcessor.AutoConfigure, bridge);
WriteDirectFireAndForgetSync(msg, ResultProcessor.AutoConfigure, bridge);
#pragma warning restore CS0618
return true;
}
return false;
......@@ -553,13 +561,36 @@ internal void OnHeartbeat()
}
}
private static async Task<T> WriteDirectAsync_Awaited<T>(ServerEndPoint @this, Message message, ValueTask<WriteResult> write, TaskCompletionSource<T> tcs)
{
var result = await write.ForAwait();
if (result != WriteResult.Success)
{
var ex = @this.Multiplexer.GetException(result, message, @this);
ConnectionMultiplexer.ThrowFailed(tcs, ex);
}
return await tcs.Task.ForAwait();
}
internal Task<T> WriteDirectAsync<T>(Message message, ResultProcessor<T> processor, object asyncState = null, PhysicalBridge bridge = null)
{
var tcs = TaskSource.Create<T>(asyncState);
var source = ResultBox<T>.Get(tcs);
message.SetSource(processor, source);
if (bridge == null) bridge = GetBridge(message.Command);
var result = bridge?.TryWrite(message, isSlave) ?? WriteResult.NoConnectionAvailable;
WriteResult result;
if (bridge == null)
{
result = WriteResult.NoConnectionAvailable;
}
else
{
var write = bridge.TryWriteAsync(message, isSlave);
if (!write.IsCompletedSuccessfully) return WriteDirectAsync_Awaited<T>(this, message, write, tcs);
result = write.Result;
}
if (result != WriteResult.Success)
{
var ex = Multiplexer.GetException(result, message, this);
......@@ -568,13 +599,16 @@ internal Task<T> WriteDirectAsync<T>(Message message, ResultProcessor<T> process
return tcs.Task;
}
internal void WriteDirectFireAndForget<T>(Message message, ResultProcessor<T> processor, PhysicalBridge bridge = null)
[Obsolete("prefer async")]
internal void WriteDirectFireAndForgetSync<T>(Message message, ResultProcessor<T> processor, PhysicalBridge bridge = null)
{
if (message != null)
{
message.SetSource(processor, null);
Multiplexer.Trace("Enqueue: " + message);
(bridge ?? GetBridge(message.Command)).TryWrite(message, isSlave);
#pragma warning disable CS0618
(bridge ?? GetBridge(message.Command)).TryWriteSync(message, isSlave);
#pragma warning restore CS0618
}
}
......@@ -624,7 +658,40 @@ internal string Summary()
return sb.ToString();
}
internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection, Message message, ResultProcessor<T> processor)
internal ValueTask WriteDirectOrQueueFireAndForgetAsync<T>(PhysicalConnection connection, Message message, ResultProcessor<T> processor)
{
async ValueTask Awaited(ValueTask<WriteResult> l_result) => await l_result.ForAwait();
if (message != null)
{
message.SetSource(processor, null);
ValueTask<WriteResult> result;
if (connection == null)
{
Multiplexer.Trace("Enqueue: " + message);
result = GetBridge(message.Command).TryWriteAsync(message, isSlave);
}
else
{
Multiplexer.Trace("Writing direct: " + message);
var bridge = connection.BridgeCouldBeNull;
if (bridge == null)
{
throw new ObjectDisposedException(connection.ToString());
}
else
{
result = bridge.WriteMessageTakingWriteLockAsync(connection, message);
}
}
if (!result.IsCompletedSuccessfully) return Awaited(result);
}
return default;
}
[Obsolete("prefer aysnc")]
internal void WriteDirectOrQueueFireAndForgetSync<T>(PhysicalConnection connection, Message message, ResultProcessor<T> processor)
{
if (message != null)
{
......@@ -632,7 +699,9 @@ internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection,
if (connection == null)
{
Multiplexer.Trace("Enqueue: " + message);
GetBridge(message.Command).TryWrite(message, isSlave);
#pragma warning disable CS0618
GetBridge(message.Command).TryWriteSync(message, isSlave);
#pragma warning restore CS0618
}
else
{
......@@ -644,11 +713,14 @@ internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection,
}
else
{
bridge.WriteMessageTakingWriteLock(connection, message);
#pragma warning disable CS0618
bridge.WriteMessageTakingWriteLockSync(connection, message);
#pragma warning restore CS0618
}
}
}
}
private PhysicalBridge CreateBridge(ConnectionType type, TextWriter log)
{
if (Multiplexer.IsDisposed) return null;
......@@ -658,13 +730,13 @@ private PhysicalBridge CreateBridge(ConnectionType type, TextWriter log)
return bridge;
}
private Task HandshakeAsync(PhysicalConnection connection, TextWriter log)
private async Task HandshakeAsync(PhysicalConnection connection, TextWriter log)
{
Multiplexer.LogLocked(log, "Server handshake");
if (connection == null)
{
Multiplexer.Trace("No connection!?");
return Task.CompletedTask;
return;
}
Message msg;
string password = Multiplexer.RawConfig.Password;
......@@ -673,8 +745,9 @@ private Task HandshakeAsync(PhysicalConnection connection, TextWriter log)
Multiplexer.LogLocked(log, "Authenticating (password)");
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.AUTH, (RedisValue)password);
msg.SetInternalCall();
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.DemandOK);
await WriteDirectOrQueueFireAndForgetAsync(connection, msg, ResultProcessor.DemandOK).ForAwait();
}
if (Multiplexer.CommandMap.IsAvailable(RedisCommand.CLIENT))
{
string name = Multiplexer.ClientName;
......@@ -686,7 +759,7 @@ private Task HandshakeAsync(PhysicalConnection connection, TextWriter log)
Multiplexer.LogLocked(log, "Setting client name: {0}", name);
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.CLIENT, RedisLiterals.SETNAME, (RedisValue)name);
msg.SetInternalCall();
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.DemandOK);
await WriteDirectOrQueueFireAndForgetAsync(connection, msg, ResultProcessor.DemandOK).ForAwait();
}
}
}
......@@ -694,7 +767,7 @@ private Task HandshakeAsync(PhysicalConnection connection, TextWriter log)
var bridge = connection.BridgeCouldBeNull;
if (bridge == null)
{
return Task.CompletedTask;
return;
}
var connType = bridge.ConnectionType;
......@@ -706,7 +779,7 @@ private Task HandshakeAsync(PhysicalConnection connection, TextWriter log)
Multiplexer.LogLocked(log, "Sending critical tracer: {0}", bridge);
var tracer = GetTracerMessage(true);
tracer = LoggingMessage.Create(log, tracer);
WriteDirectOrQueueFireAndForget(connection, tracer, ResultProcessor.EstablishConnection);
await WriteDirectOrQueueFireAndForgetAsync(connection, tracer, ResultProcessor.EstablishConnection).ForAwait();
// note: this **must** be the last thing on the subscription handshake, because after this
// we will be in subscriber mode: regular commands cannot be sent
......@@ -716,11 +789,11 @@ private Task HandshakeAsync(PhysicalConnection connection, TextWriter log)
if (configChannel != null)
{
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.SUBSCRIBE, (RedisChannel)configChannel);
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.TrackSubscriptions);
await WriteDirectOrQueueFireAndForgetAsync(connection, msg, ResultProcessor.TrackSubscriptions).ForAwait();
}
}
Multiplexer.LogLocked(log, "Flushing outbound buffer");
return connection.FlushAsync();
await connection.FlushAsync().ForAwait();
}
private void SetConfig<T>(ref T field, T value, [CallerMemberName] string caller = null)
......
......@@ -61,10 +61,10 @@ public ServerSelectionStrategy(ConnectionMultiplexer multiplexer)
/// Computes the hash-slot that would be used by the given key
/// </summary>
/// <param name="key">The <see cref="RedisKey"/> to determine a slot ID for.</param>
public int HashSlot(RedisKey key)
public int HashSlot(in RedisKey key)
=> ServerType == ServerType.Standalone ? NoSlot : GetClusterSlot(key);
private static unsafe int GetClusterSlot(RedisKey key)
private static unsafe int GetClusterSlot(in RedisKey key)
{
//HASH_SLOT = CRC16(key) mod 16384
if (key.IsNull) return NoSlot;
......@@ -107,7 +107,7 @@ public ServerEndPoint Select(Message message)
return Select(slot, message.Command, message.Flags);
}
public ServerEndPoint Select(RedisCommand command, RedisKey key, CommandFlags flags)
public ServerEndPoint Select(RedisCommand command, in RedisKey key, CommandFlags flags)
{
int slot = ServerType == ServerType.Cluster ? HashSlot(key) : NoSlot;
return Select(slot, command, flags);
......@@ -155,7 +155,9 @@ public bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isM
else
{
message.PrepareToResend(resendVia, isMoved);
retry = resendVia.TryWrite(message) == WriteResult.Success;
#pragma warning disable CS0618
retry = resendVia.TryWriteSync(message) == WriteResult.Success;
#pragma warning restore CS0618
}
}
......@@ -187,7 +189,7 @@ internal int CombineSlot(int oldSlot, int newSlot)
return oldSlot == newSlot ? oldSlot : MultipleSlots;
}
internal int CombineSlot(int oldSlot, RedisKey key)
internal int CombineSlot(int oldSlot, in RedisKey key)
{
if (oldSlot == MultipleSlots || key.IsNull) return oldSlot;
......
......@@ -26,6 +26,7 @@ public static Task<T> ObserveErrors<T>(this Task<T> task)
}
public static ConfiguredTaskAwaitable ForAwait(this Task task) => task.ConfigureAwait(false);
public static ConfiguredValueTaskAwaitable ForAwait(this ValueTask task) => task.ConfigureAwait(false);
public static ConfiguredTaskAwaitable<T> ForAwait<T>(this Task<T> task) => task.ConfigureAwait(false);
public static ConfiguredValueTaskAwaitable<T> ForAwait<T>(this ValueTask<T> task) => task.ConfigureAwait(false);
......@@ -37,10 +38,10 @@ public static Task<T> ObserveErrors<T>(this Task<T> task)
public static async Task<bool> TimeoutAfter(this Task task, int timeoutMs)
{
var cts = new CancellationTokenSource();
if (task == await Task.WhenAny(task, Task.Delay(timeoutMs, cts.Token)).ConfigureAwait(false))
if (task == await Task.WhenAny(task, Task.Delay(timeoutMs, cts.Token)).ForAwait())
{
cts.Cancel();
await task.ConfigureAwait(false);
await task.ForAwait();
return true;
}
else
......
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
......@@ -31,5 +34,59 @@ public void SubscriberCount()
Assert.Contains(channel, channels);
}
}
[Fact]
public async Task SubscriberCountAsync()
{
using (var conn = Create())
{
RedisChannel channel = Me() + Guid.NewGuid();
var server = conn.GetServer(conn.GetEndPoints()[0]);
var channels = await server.SubscriptionChannelsAsync(Me() + "*").WithTimeout(2000);
Assert.DoesNotContain(channel, channels);
long justWork = await server.SubscriptionPatternCountAsync().WithTimeout(2000);
var count = await server.SubscriptionSubscriberCountAsync(channel).WithTimeout(2000);
Assert.Equal(0, count);
await conn.GetSubscriber().SubscribeAsync(channel, delegate { }).WithTimeout(2000);
count = await server.SubscriptionSubscriberCountAsync(channel).WithTimeout(2000);
Assert.Equal(1, count);
channels = await server.SubscriptionChannelsAsync(Me() + "*").WithTimeout(2000);
Assert.Contains(channel, channels);
}
}
}
static class Util
{
public static async Task WithTimeout(this Task task, int timeoutMs,
[CallerMemberName] string caller = null, [CallerLineNumber] int line = 0)
{
var cts = new CancellationTokenSource();
if (task == await Task.WhenAny(task, Task.Delay(timeoutMs, cts.Token)).ForAwait())
{
cts.Cancel();
await task.ForAwait();
}
else
{
throw new TimeoutException($"timout from {caller} line {line}");
}
}
public static async Task<T> WithTimeout<T>(this Task<T> task, int timeoutMs,
[CallerMemberName] string caller = null, [CallerLineNumber] int line = 0)
{
var cts = new CancellationTokenSource();
if (task == await Task.WhenAny(task, Task.Delay(timeoutMs, cts.Token)).ForAwait())
{
cts.Cancel();
return await task.ForAwait();
}
else
{
throw new TimeoutException($"timout from {caller} line {line}");
}
}
}
}
......@@ -8,55 +8,16 @@ namespace TestConsole
{
internal static class Program
{
private const int taskCount = 10;
private const int totalRecords = 100000;
private static void Main()
{
#if SEV2
Pipelines.Sockets.Unofficial.SocketConnection.AssertDependencies();
Console.WriteLine("We loaded the things...");
// Console.ReadLine();
#endif
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
var taskList = new List<Task>();
var connection = ConnectionMultiplexer.Connect("127.0.0.1");
for (int i = 0; i < taskCount; i++)
{
var i1 = i;
var task = new Task(() => Run(i1, connection));
task.Start();
taskList.Add(task);
}
Task.WaitAll(taskList.ToArray());
stopwatch.Stop();
Console.WriteLine($"Done. {stopwatch.ElapsedMilliseconds}");
Console.ReadLine();
}
private static void Run(int taskId, ConnectionMultiplexer connection)
public static async Task Main()
{
Console.WriteLine($"{taskId} Started");
var database = connection.GetDatabase(0);
for (int i = 0; i < totalRecords; i++)
{
database.StringSet(i.ToString(), i.ToString());
}
Console.WriteLine($"{taskId} Insert completed");
for (int i = 0; i < totalRecords; i++)
using (var muxer = await ConnectionMultiplexer.ConnectAsync("127.0.0.1"))
{
var result = database.StringGet(i.ToString());
var db = muxer.GetDatabase();
var sub = muxer.GetSubscriber();
Console.WriteLine("subscribing");
ChannelMessageQueue queue = await sub.SubscribeAsync("yolo");
Console.WriteLine("subscribed");
}
Console.WriteLine($"{taskId} Completed");
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment