Commit 795379da authored by Marc Gravell's avatar Marc Gravell

remove the concept of the unsent queue; the pipeline **is** that queue - no...

remove the concept of the unsent queue; the pipeline **is** that queue - no need to duplicate (and significantly simplifies); if you've successfully pushed a message to the bridge: *it is in the pipe*; also removes "ar", "wr", "wq", "qu" and "pending" counters
parent 60f90766
...@@ -18,18 +18,6 @@ public partial class ConnectionMultiplexer ...@@ -18,18 +18,6 @@ public partial class ConnectionMultiplexer
if (ownsSocketManager) socketManager?.Dispose(); if (ownsSocketManager) socketManager?.Dispose();
socketManager = null; socketManager = null;
} }
internal void RequestWrite(PhysicalBridge bridge, bool forced)
{
if (bridge == null) return;
var tmp = SocketManager;
if (tmp != null)
{
Trace("Requesting write: " + bridge.Name);
tmp.RequestWrite(bridge, forced);
}
}
partial void OnWriterCreated(); partial void OnWriterCreated();
} }
} }
...@@ -2007,10 +2007,6 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser ...@@ -2007,10 +2007,6 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
} }
else else
{ {
#if FEATURE_SOCKET_MODE_POLL
var mgrState = socketManager.State;
var lastError = socketManager.LastErrorTimeRelative();
#endif
var sb = new StringBuilder("Timeout performing ").Append(message.CommandAndKey); var sb = new StringBuilder("Timeout performing ").Append(message.CommandAndKey);
data = new List<Tuple<string, string>> { Tuple.Create("Message", message.CommandAndKey) }; data = new List<Tuple<string, string>> { Tuple.Create("Message", message.CommandAndKey) };
void add(string lk, string sk, string v) void add(string lk, string sk, string v)
...@@ -2019,20 +2015,12 @@ void add(string lk, string sk, string v) ...@@ -2019,20 +2015,12 @@ void add(string lk, string sk, string v)
sb.Append(", ").Append(sk).Append(": ").Append(v); sb.Append(", ").Append(sk).Append(": ").Append(v);
} }
int queue = server.GetOutstandingCount(message.Command, out int inst, out int qu, out int qs, out int qc, out int wr, out int wq, out int @in, out int ar); int queue = server.GetOutstandingCount(message.Command, out int inst, out int qs, out int qc, out int @in);
add("Instantaneous", "inst", inst.ToString()); add("Instantaneous", "inst", inst.ToString());
#if FEATURE_SOCKET_MODE_POLL
add("Manager-State", "mgr", mgrState.ToString());
add("Last-Error", "err", lastError);
#endif
add("Queue-Length", "queue", queue.ToString()); add("Queue-Length", "queue", queue.ToString());
add("Queue-Outstanding", "qu", qu.ToString());
add("Queue-Awaiting-Response", "qs", qs.ToString()); add("Queue-Awaiting-Response", "qs", qs.ToString());
add("Queue-Completion-Outstanding", "qc", qc.ToString()); add("Queue-Completion-Outstanding", "qc", qc.ToString());
add("Active-Writers", "wr", wr.ToString());
add("Write-Queue", "wq", wq.ToString());
add("Inbound-Bytes", "in", @in.ToString()); add("Inbound-Bytes", "in", @in.ToString());
add("Active-Readers", "ar", ar.ToString());
add("Client-Name", "clientName", ClientName); add("Client-Name", "clientName", ClientName);
add("Server-Endpoint", "serverEndpoint", server.EndPoint.ToString()); add("Server-Endpoint", "serverEndpoint", server.EndPoint.ToString());
......
...@@ -17,12 +17,6 @@ internal partial class ResultBox ...@@ -17,12 +17,6 @@ internal partial class ResultBox
public partial interface IServer public partial interface IServer
{ {
/// <summary>
/// Show what is in the pending (unsent) queue
/// </summary>
/// <param name="maxCount">The maximum count to list.</param>
string ListPending(int maxCount);
/// <summary> /// <summary>
/// Get the value of key. If the key does not exist the special value nil is returned. An error is returned if the value stored at key is not a string, because GET only handles string values. /// Get the value of key. If the key does not exist the special value nil is returned. An error is returned if the value stored at key is not a string, because GET only handles string values.
/// </summary> /// </summary>
...@@ -114,21 +108,11 @@ internal void SimulateConnectionFailure() ...@@ -114,21 +108,11 @@ internal void SimulateConnectionFailure()
interactive?.SimulateConnectionFailure(); interactive?.SimulateConnectionFailure();
subscription?.SimulateConnectionFailure(); subscription?.SimulateConnectionFailure();
} }
internal string ListPending(int maxCount)
{
var sb = new StringBuilder();
interactive?.ListPending(sb, maxCount);
subscription?.ListPending(sb, maxCount);
return sb.ToString();
}
} }
internal partial class RedisServer internal partial class RedisServer
{ {
void IServer.SimulateConnectionFailure() => server.SimulateConnectionFailure(); void IServer.SimulateConnectionFailure() => server.SimulateConnectionFailure();
string IServer.ListPending(int maxCount) => server.ListPending(maxCount);
void IServer.Crash() void IServer.Crash()
{ {
// using DB-0 because we also use "DEBUG OBJECT", which is db-centric // using DB-0 because we also use "DEBUG OBJECT", which is db-centric
...@@ -225,11 +209,6 @@ internal void SimulateConnectionFailure() ...@@ -225,11 +209,6 @@ internal void SimulateConnectionFailure()
} }
physical?.RecordConnectionFailed(ConnectionFailureType.SocketFailure); physical?.RecordConnectionFailed(ConnectionFailureType.SocketFailure);
} }
internal void ListPending(StringBuilder sb, int maxCount)
{
queue.ListPending(sb, maxCount);
}
} }
internal partial class PhysicalConnection internal partial class PhysicalConnection
......
...@@ -22,8 +22,6 @@ internal sealed partial class PhysicalBridge : IDisposable ...@@ -22,8 +22,6 @@ internal sealed partial class PhysicalBridge : IDisposable
{ {
internal readonly string Name; internal readonly string Name;
internal int inWriteQueue = 0;
private const int ProfileLogSamples = 10; private const int ProfileLogSamples = 10;
private const double ProfileLogSeconds = (ConnectionMultiplexer.MillisecondsPerHeartbeat * ProfileLogSamples) / 1000.0; private const double ProfileLogSeconds = (ConnectionMultiplexer.MillisecondsPerHeartbeat * ProfileLogSamples) / 1000.0;
...@@ -32,7 +30,9 @@ internal sealed partial class PhysicalBridge : IDisposable ...@@ -32,7 +30,9 @@ internal sealed partial class PhysicalBridge : IDisposable
private readonly CompletionManager completionManager; private readonly CompletionManager completionManager;
private readonly long[] profileLog = new long[ProfileLogSamples]; private readonly long[] profileLog = new long[ProfileLogSamples];
private readonly MessageQueue queue = new MessageQueue();
private readonly Queue<Message> _preconnectBacklog = new Queue<Message>();
private int activeWriters = 0; private int activeWriters = 0;
private int beating; private int beating;
private int failConnectCount = 0; private int failConnectCount = 0;
...@@ -123,7 +123,11 @@ public bool TryEnqueue(Message message, bool isSlave) ...@@ -123,7 +123,11 @@ public bool TryEnqueue(Message message, bool isSlave)
{ {
// you can go in the queue, but we won't be starting // you can go in the queue, but we won't be starting
// a worker, because the handshake has not completed // a worker, because the handshake has not completed
queue.Push(message); var queue = _preconnectBacklog;
lock(queue)
{
queue.Enqueue(message);
}
message.SetEnqueued(); message.SetEnqueued();
return true; return true;
} }
...@@ -134,15 +138,13 @@ public bool TryEnqueue(Message message, bool isSlave) ...@@ -134,15 +138,13 @@ public bool TryEnqueue(Message message, bool isSlave)
} }
} }
bool reqWrite = queue.Push(message); var physical = this.physical;
message.SetEnqueued(); if (physical == null) return false;
WriteMessageDirect(physical, message);
LogNonPreferred(message.Flags, isSlave); LogNonPreferred(message.Flags, isSlave);
Trace("Now pending: " + GetPendingCount());
if (reqWrite)
{
Multiplexer.RequestWrite(this, false);
}
return true; return true;
} }
...@@ -171,22 +173,8 @@ internal void AppendProfile(StringBuilder sb) ...@@ -171,22 +173,8 @@ internal void AppendProfile(StringBuilder sb)
sb.Append(" (").Append(rate.ToString("N2")).Append(" ops/s; spans ").Append(ProfileLogSeconds).Append("s)"); sb.Append(" (").Append(rate.ToString("N2")).Append(" ops/s; spans ").Append(ProfileLogSeconds).Append("s)");
} }
internal bool ConfirmRemoveFromWriteQueue()
{
lock (queue.SyncLock)
{
if (queue.Count() == 0)
{
Interlocked.Exchange(ref inWriteQueue, 0);
return true;
}
}
return false;
}
internal void GetCounters(ConnectionCounters counters) internal void GetCounters(ConnectionCounters counters)
{ {
counters.PendingUnsentItems = queue.Count();
counters.OperationCount = OperationCount; counters.OperationCount = OperationCount;
counters.SocketCount = Interlocked.Read(ref socketCount); counters.SocketCount = Interlocked.Read(ref socketCount);
counters.WriterCount = Interlocked.CompareExchange(ref activeWriters, 0, 0); counters.WriterCount = Interlocked.CompareExchange(ref activeWriters, 0, 0);
...@@ -195,28 +183,20 @@ internal void GetCounters(ConnectionCounters counters) ...@@ -195,28 +183,20 @@ internal void GetCounters(ConnectionCounters counters)
physical?.GetCounters(counters); physical?.GetCounters(counters);
} }
internal int GetOutstandingCount(out int inst, out int qu, out int qs, out int qc, out int wr, out int wq, out int @in, out int ar) internal int GetOutstandingCount(out int inst, out int qs, out int qc, out int @in)
{// defined as: PendingUnsentItems + SentItemsAwaitingResponse + ResponsesAwaitingAsyncCompletion {// defined as: PendingUnsentItems + SentItemsAwaitingResponse + ResponsesAwaitingAsyncCompletion
inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog)); inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog));
qu = queue.Count();
var tmp = physical; var tmp = physical;
if(tmp == null) if(tmp == null)
{ {
qs = @in = ar = 0; qs = @in = 0;
} else } else
{ {
qs = tmp.GetSentAwaitingResponseCount(); qs = tmp.GetSentAwaitingResponseCount();
@in = tmp.GetAvailableInboundBytes(out ar); @in = tmp.GetAvailableInboundBytes();
} }
qc = completionManager.GetOutstandingCount(); qc = completionManager.GetOutstandingCount();
wr = Interlocked.CompareExchange(ref activeWriters, 0, 0); return qs + qc;
wq = Interlocked.CompareExchange(ref inWriteQueue, 0, 0);
return qu + qs + qc;
}
internal int GetPendingCount()
{
return queue.Count();
} }
internal string GetStormLog() internal string GetStormLog()
...@@ -224,7 +204,6 @@ internal string GetStormLog() ...@@ -224,7 +204,6 @@ internal string GetStormLog()
var sb = new StringBuilder("Storm log for ").Append(Format.ToString(ServerEndPoint.EndPoint)).Append(" / ").Append(ConnectionType) var sb = new StringBuilder("Storm log for ").Append(Format.ToString(ServerEndPoint.EndPoint)).Append(" / ").Append(ConnectionType)
.Append(" at ").Append(DateTime.UtcNow) .Append(" at ").Append(DateTime.UtcNow)
.AppendLine().AppendLine(); .AppendLine().AppendLine();
queue.GetStormLog(sb);
physical?.GetStormLog(sb); physical?.GetStormLog(sb);
completionManager.GetStormLog(sb); completionManager.GetStormLog(sb);
sb.Append("Circular op-count snapshot:"); sb.Append("Circular op-count snapshot:");
...@@ -309,14 +288,6 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti ...@@ -309,14 +288,6 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti
{ {
Trace("OnDisconnected"); Trace("OnDisconnected");
// if the next thing in the pipe is a PING, we can tell it that we failed (this really helps spot doomed connects)
var ping = queue.DequeueUnsentPing(out int count);
if (ping != null)
{
Trace("Marking PING as failed (queue length: " + count + ")");
ping.Fail(failureType, null);
CompleteSyncOrAsync(ping);
}
oldState = default(State); // only defined when isCurrent = true oldState = default(State); // only defined when isCurrent = true
if (isCurrent = (physical == connection)) if (isCurrent = (physical == connection))
{ {
...@@ -339,6 +310,25 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti ...@@ -339,6 +310,25 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti
} }
} }
private Message DequeueNextPendingBacklog()
{
lock(_preconnectBacklog)
{
return _preconnectBacklog.Count == 0 ? null : _preconnectBacklog.Dequeue();
}
}
void WritePendingBacklog(PhysicalConnection connection)
{
if(connection != null)
{
Message next;
do
{
next = DequeueNextPendingBacklog();
WriteMessageDirect(connection, next);
} while (next != null);
}
}
internal void OnFullyEstablished(PhysicalConnection connection) internal void OnFullyEstablished(PhysicalConnection connection)
{ {
Trace("OnFullyEstablished"); Trace("OnFullyEstablished");
...@@ -348,8 +338,9 @@ internal void OnFullyEstablished(PhysicalConnection connection) ...@@ -348,8 +338,9 @@ internal void OnFullyEstablished(PhysicalConnection connection)
LastException = null; LastException = null;
Interlocked.Exchange(ref failConnectCount, 0); Interlocked.Exchange(ref failConnectCount, 0);
ServerEndPoint.OnFullyEstablished(connection); ServerEndPoint.OnFullyEstablished(connection);
Multiplexer.RequestWrite(this, true); WritePendingBacklog(connection);
if(ConnectionType == ConnectionType.Interactive) ServerEndPoint.CheckInfoReplication();
if (ConnectionType == ConnectionType.Interactive) ServerEndPoint.CheckInfoReplication();
} }
else else
{ {
...@@ -389,10 +380,6 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -389,10 +380,6 @@ internal void OnHeartbeat(bool ifConnectedOnly)
using (snapshot) { } // dispose etc using (snapshot) { } // dispose etc
TryConnect(null); TryConnect(null);
} }
if (!ifConnectedOnly)
{
AbortUnsent();
}
break; break;
case (int)State.ConnectedEstablishing: case (int)State.ConnectedEstablishing:
case (int)State.ConnectedEstablished: case (int)State.ConnectedEstablished:
...@@ -426,7 +413,7 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -426,7 +413,7 @@ internal void OnHeartbeat(bool ifConnectedOnly)
OnDisconnected(ConnectionFailureType.SocketFailure, tmp, out bool ignore, out State oldState); OnDisconnected(ConnectionFailureType.SocketFailure, tmp, out bool ignore, out State oldState);
} }
} }
else if (!queue.Any() && tmp.GetSentAwaitingResponseCount() != 0) else if (tmp.GetSentAwaitingResponseCount() != 0)
{ {
// there's a chance this is a dead socket; sending data will shake that // there's a chance this is a dead socket; sending data will shake that
// up a bit, so if we have an empty unsent queue and a non-empty sent // up a bit, so if we have an empty unsent queue and a non-empty sent
...@@ -439,17 +426,12 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -439,17 +426,12 @@ internal void OnHeartbeat(bool ifConnectedOnly)
Interlocked.Exchange(ref connectTimeoutRetryCount, 0); Interlocked.Exchange(ref connectTimeoutRetryCount, 0);
if (!ifConnectedOnly) if (!ifConnectedOnly)
{ {
AbortUnsent();
Multiplexer.Trace("Resurrecting " + ToString()); Multiplexer.Trace("Resurrecting " + ToString());
GetConnection(null); GetConnection(null);
} }
break; break;
default: default:
Interlocked.Exchange(ref connectTimeoutRetryCount, 0); Interlocked.Exchange(ref connectTimeoutRetryCount, 0);
if (!ifConnectedOnly)
{
AbortUnsent();
}
break; break;
} }
} }
...@@ -493,159 +475,65 @@ internal bool TryEnqueue(List<Message> messages, bool isSlave) ...@@ -493,159 +475,65 @@ internal bool TryEnqueue(List<Message> messages, bool isSlave)
{ {
return false; return false;
} }
bool reqWrite = false;
var physical = this.physical;
if (physical == null) return false;
foreach (var message in messages) foreach (var message in messages)
{ // deliberately not taking a single lock here; we don't care if { // deliberately not taking a single lock here; we don't care if
// other threads manage to interleave - in fact, it would be desirable // other threads manage to interleave - in fact, it would be desirable
// (to avoid a batch monopolising the connection) // (to avoid a batch monopolising the connection)
if (queue.Push(message)) reqWrite = true; WriteMessageDirect(physical, message);
LogNonPreferred(message.Flags, isSlave); LogNonPreferred(message.Flags, isSlave);
} }
Trace("Now pending: " + GetPendingCount());
if (reqWrite) // was empty before
{
Multiplexer.RequestWrite(this, false);
}
return true; return true;
} }
private readonly object WriteLock = new object();
/// <summary> /// <summary>
/// This writes a message **directly** to the output stream; note /// This writes a message to the output stream
/// that this ignores the queue, so should only be used *either*
/// from the regular dequeue loop, *or* from the "I've just
/// connected" handshake (when there is no dequeue loop) - otherwise,
/// you can pretty much assume you're going to destroy the stream
/// </summary> /// </summary>
internal bool WriteMessageDirect(PhysicalConnection tmp, Message next) internal bool WriteMessageDirect(PhysicalConnection physical, Message next)
{ {
Trace("Writing: " + next); Trace("Writing: " + next);
var messageIsSent = false;
if (next is IMultiMessage)
{
SelectDatabase(tmp, next); // need to switch database *before* the transaction
foreach (var subCommand in ((IMultiMessage)next).GetMessages(tmp))
{
if (!WriteMessageToServer(tmp, subCommand))
{
// we screwed up; abort; note that WriteMessageToServer already
// killed the underlying connection
Trace("Unable to write to server");
next.Fail(ConnectionFailureType.ProtocolFailure, null);
CompleteSyncOrAsync(next);
return false;
}
//The parent message (next) may be returned from GetMessages
//and should not be marked as sent again below
messageIsSent = messageIsSent || subCommand == next;
}
if (!messageIsSent)
{
next.SetRequestSent();
}
return true; bool result;
} lock (WriteLock)
else
{ {
return WriteMessageToServer(tmp, next); var messageIsSent = false;
} if (next is IMultiMessage)
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
static ValueTask<T> AsResult<T>(T value) => new ValueTask<T>(value);
internal async ValueTask<WriteResult> WriteQueueAsync(int maxWork)
{
bool weAreWriter = false;
PhysicalConnection conn = null;
try
{
Trace("Writing queue from bridge");
weAreWriter = Interlocked.CompareExchange(ref activeWriters, 1, 0) == 0;
if (!weAreWriter)
{
Trace("(aborting: existing writer)");
return WriteResult.CompetingWriter;
}
conn = GetConnection(null);
if (conn == null)
{ {
AbortUnsent(); SelectDatabase(physical, next); // need to switch database *before* the transaction
Trace("Connection not available; exiting"); foreach (var subCommand in ((IMultiMessage)next).GetMessages(physical))
return WriteResult.NoConnection;
}
Message last;
int count = 0;
while (true)
{
var next = queue.Dequeue();
if (next == null)
{ {
Trace("Nothing to write; exiting"); if (!WriteMessageToServer(physical, subCommand))
if(count == 0)
{ {
await conn.FlushAsync(); // only flush on an empty run // we screwed up; abort; note that WriteMessageToServer already
return WriteResult.NothingToDo; // killed the underlying connection
Trace("Unable to write to server");
next.Fail(ConnectionFailureType.ProtocolFailure, null);
CompleteSyncOrAsync(next);
return false;
} }
return WriteResult.QueueEmptyAfterWrite; //The parent message (next) may be returned from GetMessages
//and should not be marked as sent again below
messageIsSent = messageIsSent || subCommand == next;
} }
last = next; if (!messageIsSent)
Trace("Now pending: " + GetPendingCount());
if (!WriteMessageDirect(conn, next))
{ {
AbortUnsent(); next.SetRequestSent(); // well, it was attempted, at least...
Trace("write failed; connection is toast; exiting");
return WriteResult.NoConnection;
} }
count++;
if (maxWork > 0 && count >= maxWork) result = true;
{
Trace("Work limit; exiting");
Trace(last != null, "Flushed up to: " + last);
await conn.FlushAsync();
break;
}
}
}
catch (IOException ex)
{
if (conn != null)
{
conn.RecordConnectionFailed(ConnectionFailureType.SocketFailure, ex);
conn = null;
} }
AbortUnsent(); else
}
catch (Exception ex)
{
AbortUnsent();
OnInternalError(ex);
}
finally
{
if (weAreWriter)
{ {
Interlocked.Exchange(ref activeWriters, 0); result = WriteMessageToServer(physical, next);
Trace("Exiting writer");
} }
physical.WakeWriterAndCheckForThrottle();
} }
return queue.Any() ? WriteResult.MoreWork : WriteResult.QueueEmptyAfterWrite;
}
private void AbortUnsent() return result;
{
var dead = queue.DequeueAll();
Trace(dead.Length != 0, "Aborting " + dead.Length + " messages");
for (int i = 0; i < dead.Length; i++)
{
var msg = dead[i];
msg.Fail(ConnectionFailureType.UnableToResolvePhysicalConnection, null);
CompleteSyncOrAsync(msg);
}
} }
private State ChangeState(State newState) private State ChangeState(State newState)
...@@ -656,11 +544,6 @@ private State ChangeState(State newState) ...@@ -656,11 +544,6 @@ private State ChangeState(State newState)
if (oldState != newState) if (oldState != newState)
{ {
Multiplexer.Trace(ConnectionType + " state changed from " + oldState + " to " + newState); Multiplexer.Trace(ConnectionType + " state changed from " + oldState + " to " + newState);
if (newState == State.Disconnected)
{
AbortUnsent();
}
} }
return oldState; return oldState;
} }
......
...@@ -169,14 +169,14 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, ref Socket ...@@ -169,14 +169,14 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, ref Socket
// stop anything new coming in... // stop anything new coming in...
Bridge.Trace("Failed: " + failureType); Bridge.Trace("Failed: " + failureType);
int @in = -1, ar = -1; int @in = -1;
managerState = SocketManager.ManagerState.RecordConnectionFailed_OnDisconnected; managerState = SocketManager.ManagerState.RecordConnectionFailed_OnDisconnected;
Bridge.OnDisconnected(failureType, this, out bool isCurrent, out PhysicalBridge.State oldState); Bridge.OnDisconnected(failureType, this, out bool isCurrent, out PhysicalBridge.State oldState);
if (oldState == PhysicalBridge.State.ConnectedEstablished) if (oldState == PhysicalBridge.State.ConnectedEstablished)
{ {
try try
{ {
@in = GetAvailableInboundBytes(out ar); @in = GetAvailableInboundBytes();
} }
catch { /* best effort only */ } catch { /* best effort only */ }
} }
...@@ -211,23 +211,16 @@ void add(string lk, string sk, string v) ...@@ -211,23 +211,16 @@ void add(string lk, string sk, string v)
add("Last-Write", "last-write", (unchecked(now - lastWrite) / 1000) + "s ago"); add("Last-Write", "last-write", (unchecked(now - lastWrite) / 1000) + "s ago");
add("Unanswered-Write", "unanswered-write", (unchecked(now - unansweredRead) / 1000) + "s ago"); add("Unanswered-Write", "unanswered-write", (unchecked(now - unansweredRead) / 1000) + "s ago");
add("Keep-Alive", "keep-alive", Bridge.ServerEndPoint.WriteEverySeconds + "s"); add("Keep-Alive", "keep-alive", Bridge.ServerEndPoint.WriteEverySeconds + "s");
add("Pending", "pending", Bridge.GetPendingCount().ToString());
add("Previous-Physical-State", "state", oldState.ToString()); add("Previous-Physical-State", "state", oldState.ToString());
if (@in >= 0) if (@in >= 0)
{ {
add("Inbound-Bytes", "in", @in.ToString()); add("Inbound-Bytes", "in", @in.ToString());
add("Active-Readers", "ar", ar.ToString());
} }
add("Last-Heartbeat", "last-heartbeat", (lastBeat == 0 ? "never" : ((unchecked(now - lastBeat) / 1000) + "s ago")) + (Bridge.IsBeating ? " (mid-beat)" : "")); add("Last-Heartbeat", "last-heartbeat", (lastBeat == 0 ? "never" : ((unchecked(now - lastBeat) / 1000) + "s ago")) + (Bridge.IsBeating ? " (mid-beat)" : ""));
add("Last-Multiplexer-Heartbeat", "last-mbeat", Multiplexer.LastHeartbeatSecondsAgo + "s ago"); add("Last-Multiplexer-Heartbeat", "last-mbeat", Multiplexer.LastHeartbeatSecondsAgo + "s ago");
add("Last-Global-Heartbeat", "global", ConnectionMultiplexer.LastGlobalHeartbeatSecondsAgo + "s ago"); add("Last-Global-Heartbeat", "global", ConnectionMultiplexer.LastGlobalHeartbeatSecondsAgo + "s ago");
#if FEATURE_SOCKET_MODE_POLL
var mgr = Bridge.Multiplexer.SocketManager;
add("SocketManager-State", "mgr", mgr.State.ToString());
add("Last-Error", "err", mgr.LastErrorTimeRelative());
#endif
} }
var ex = innerException == null var ex = innerException == null
...@@ -592,6 +585,12 @@ private static int WriteRaw(Span<byte> span, long value, bool withLengthPrefix = ...@@ -592,6 +585,12 @@ private static int WriteRaw(Span<byte> span, long value, bool withLengthPrefix =
return WriteCrlf(span, offset); return WriteCrlf(span, offset);
} }
internal void WakeWriterAndCheckForThrottle()
{
var flush = _ioPipe.Output.FlushAsync();
if (!flush.IsCompletedSuccessfully) flush.AsTask().Wait();
}
static readonly byte[] NullBulkString = Encoding.ASCII.GetBytes("$-1\r\n"), EmptyBulkString = Encoding.ASCII.GetBytes("$0\r\n\r\n"); static readonly byte[] NullBulkString = Encoding.ASCII.GetBytes("$-1\r\n"), EmptyBulkString = Encoding.ASCII.GetBytes("$0\r\n\r\n");
private static void WriteUnified(PipeWriter writer, byte[] value) private static void WriteUnified(PipeWriter writer, byte[] value)
{ {
...@@ -801,15 +800,8 @@ private static void WriteUnified(PipeWriter writer, long value) ...@@ -801,15 +800,8 @@ private static void WriteUnified(PipeWriter writer, long value)
var bytes = WriteRaw(span, value, withLengthPrefix: true, offset: 1); var bytes = WriteRaw(span, value, withLengthPrefix: true, offset: 1);
writer.Advance(bytes); writer.Advance(bytes);
} }
private int haveReader; internal int GetAvailableInboundBytes() => socketToken.Available;
internal int GetAvailableInboundBytes(out int activeReaders)
{
activeReaders = Interlocked.CompareExchange(ref haveReader, 0, 0);
return socketToken.Available;
}
private static LocalCertificateSelectionCallback GetAmbientCertificateCallback() private static LocalCertificateSelectionCallback GetAmbientCertificateCallback()
{ {
......
...@@ -367,15 +367,15 @@ internal ServerCounters GetCounters() ...@@ -367,15 +367,15 @@ internal ServerCounters GetCounters()
return counters; return counters;
} }
internal int GetOutstandingCount(RedisCommand command, out int inst, out int qu, out int qs, out int qc, out int wr, out int wq, out int @in, out int ar) internal int GetOutstandingCount(RedisCommand command, out int inst, out int qs, out int qc, out int @in)
{ {
var bridge = GetBridge(command, false); var bridge = GetBridge(command, false);
if (bridge == null) if (bridge == null)
{ {
return inst = qu = qs = qc = wr = wq = @in = ar = 0; return inst = qs = qc = @in = 0;
} }
return bridge.GetOutstandingCount(out inst, out qu, out qs, out qc, out wr, out wq, out @in, out ar); return bridge.GetOutstandingCount(out inst, out qs, out qc, out @in);
} }
internal string GetProfile() internal string GetProfile()
......
using System; using System;
using System.Collections.Generic;
using System.IO; using System.IO;
using System.IO.Pipelines; using System.IO.Pipelines;
using System.Net; using System.Net;
...@@ -13,9 +12,7 @@ namespace StackExchange.Redis ...@@ -13,9 +12,7 @@ namespace StackExchange.Redis
internal enum SocketMode internal enum SocketMode
{ {
Abort, Abort,
[Obsolete("just don't", error: true)] Async,
Poll,
Async
} }
/// <summary> /// <summary>
...@@ -108,11 +105,7 @@ internal enum ManagerState ...@@ -108,11 +105,7 @@ internal enum ManagerState
ProcessReadQueue, ProcessReadQueue,
ProcessErrorQueue, ProcessErrorQueue,
} }
private readonly Queue<PhysicalBridge> writeQueue = new Queue<PhysicalBridge>();
private bool isDisposed;
private readonly bool useHighPrioritySocketThreads = true;
/// <summary> /// <summary>
/// Gets the name of this SocketManager instance /// Gets the name of this SocketManager instance
/// </summary> /// </summary>
...@@ -133,12 +126,7 @@ public SocketManager(string name, bool useHighPrioritySocketThreads) ...@@ -133,12 +126,7 @@ public SocketManager(string name, bool useHighPrioritySocketThreads)
{ {
if (string.IsNullOrWhiteSpace(name)) name = GetType().Name; if (string.IsNullOrWhiteSpace(name)) name = GetType().Name;
Name = name; Name = name;
this.useHighPrioritySocketThreads = useHighPrioritySocketThreads;
_writeOneQueueAsync = () => WriteOneQueueAsync();
Task.Run(() => WriteAllQueuesAsync());
const int Receive_PauseWriterThreshold = 1024 * 1024 * 1024; // let's give it up to 1GiB of buffer for now const int Receive_PauseWriterThreshold = 1024 * 1024 * 1024; // let's give it up to 1GiB of buffer for now
var defaultPipeOptions = PipeOptions.Default; var defaultPipeOptions = PipeOptions.Default;
...@@ -159,9 +147,6 @@ public SocketManager(string name, bool useHighPrioritySocketThreads) ...@@ -159,9 +147,6 @@ public SocketManager(string name, bool useHighPrioritySocketThreads)
readonly DedicatedThreadPoolPipeScheduler _scheduler; readonly DedicatedThreadPoolPipeScheduler _scheduler;
internal readonly PipeOptions SendPipeOptions, ReceivePipeOptions; internal readonly PipeOptions SendPipeOptions, ReceivePipeOptions;
private readonly Func<Task> _writeOneQueueAsync;
private enum CallbackOperation private enum CallbackOperation
{ {
Read, Read,
...@@ -174,12 +159,6 @@ private enum CallbackOperation ...@@ -174,12 +159,6 @@ private enum CallbackOperation
public void Dispose() public void Dispose()
{ {
_scheduler?.Dispose(); _scheduler?.Dispose();
lock (writeQueue)
{
// make sure writer threads know to exit
isDisposed = true;
Monitor.PulseAll(writeQueue);
}
OnDispose(); OnDispose();
} }
internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, ConnectionMultiplexer multiplexer, TextWriter log) internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, ConnectionMultiplexer multiplexer, TextWriter log)
...@@ -247,27 +226,6 @@ void proxyCallback(IAsyncResult ar) ...@@ -247,27 +226,6 @@ void proxyCallback(IAsyncResult ar)
return token; return token;
} }
internal void RequestWrite(PhysicalBridge bridge, bool forced)
{
if (Interlocked.CompareExchange(ref bridge.inWriteQueue, 1, 0) == 0 || forced)
{
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
if (writeQueue.Count == 1)
{
Monitor.PulseAll(writeQueue);
}
else if (writeQueue.Count >= 2)
{ // struggling are we? let's have some help dealing with the backlog
Task.Run(_writeOneQueueAsync);
}
}
}
}
internal void Shutdown(SocketToken token) internal void Shutdown(SocketToken token)
{ {
Shutdown(token.Socket); Shutdown(token.Socket);
...@@ -345,88 +303,5 @@ private void Shutdown(Socket socket) ...@@ -345,88 +303,5 @@ private void Shutdown(Socket socket)
try { socket.Dispose(); } catch { } try { socket.Dispose(); } catch { }
} }
} }
private async Task WriteAllQueuesAsync()
{
while (true)
{
PhysicalBridge bridge;
lock (writeQueue)
{
if (writeQueue.Count == 0)
{
if (isDisposed) break; // <========= exit point
Monitor.Wait(writeQueue);
if (isDisposed) break; // (woken by Dispose)
if (writeQueue.Count == 0) continue; // still nothing...
}
bridge = writeQueue.Dequeue();
}
switch (await bridge.WriteQueueAsync(200))
{
case WriteResult.MoreWork:
case WriteResult.QueueEmptyAfterWrite:
// back of the line!
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
}
break;
case WriteResult.CompetingWriter:
break;
case WriteResult.NoConnection:
Interlocked.Exchange(ref bridge.inWriteQueue, 0);
break;
case WriteResult.NothingToDo:
if (!bridge.ConfirmRemoveFromWriteQueue())
{ // more snuck in; back of the line!
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
}
}
break;
}
}
}
private Task WriteOneQueueAsync()
{
PhysicalBridge bridge;
lock (writeQueue)
{
bridge = writeQueue.Count == 0 ? null : writeQueue.Dequeue();
}
if (bridge == null) return Task.CompletedTask;
return WriteOneQueueAsyncImpl(bridge);
}
private async Task WriteOneQueueAsyncImpl(PhysicalBridge bridge)
{
bool keepGoing;
do
{
switch (await bridge.WriteQueueAsync(-1))
{
case WriteResult.MoreWork:
case WriteResult.QueueEmptyAfterWrite:
keepGoing = true;
break;
case WriteResult.NothingToDo:
keepGoing = !bridge.ConfirmRemoveFromWriteQueue();
break;
case WriteResult.CompetingWriter:
keepGoing = false;
break;
case WriteResult.NoConnection:
Interlocked.Exchange(ref bridge.inWriteQueue, 0);
keepGoing = false;
break;
default:
keepGoing = false;
break;
}
} while (keepGoing);
}
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment