Commit ea46109b authored by Marc Gravell's avatar Marc Gravell

ensure that we actually update the last-write-time so we don't constantly...

ensure that we actually update the last-write-time so we don't constantly heartbeat; pre-emptively set last-write-time *before* sending heartbeat, to prevent over-runs; track physical's write-state
parent 092c5ac9
...@@ -263,6 +263,7 @@ internal void KeepAlive() ...@@ -263,6 +263,7 @@ internal void KeepAlive()
msg.SetInternalCall(); msg.SetInternalCall();
Multiplexer.Trace("Enqueue: " + msg); Multiplexer.Trace("Enqueue: " + msg);
Multiplexer.OnInfoMessage($"heartbeat '{msg.CommandAndKey}' on '{PhysicalName}' (v{features.Version})"); Multiplexer.OnInfoMessage($"heartbeat '{msg.CommandAndKey}' on '{PhysicalName}' (v{features.Version})");
physical?.UpdateLastWriteTime(); // pre-emptively
var result = TryWrite(msg, ServerEndPoint.IsSlave); var result = TryWrite(msg, ServerEndPoint.IsSlave);
if (result != WriteResult.Success) if (result != WriteResult.Success)
...@@ -375,6 +376,7 @@ private void AbandonPendingBacklog(Exception ex) ...@@ -375,6 +376,7 @@ private void AbandonPendingBacklog(Exception ex)
internal void OnFullyEstablished(PhysicalConnection connection) internal void OnFullyEstablished(PhysicalConnection connection)
{ {
Trace("OnFullyEstablished"); Trace("OnFullyEstablished");
connection?.SetIdle();
if (physical == connection && !isDisposed && ChangeState(State.ConnectedEstablishing, State.ConnectedEstablished)) if (physical == connection && !isDisposed && ChangeState(State.ConnectedEstablishing, State.ConnectedEstablished))
{ {
reportNextFailure = reconfigureNextFailure = true; reportNextFailure = reconfigureNextFailure = true;
...@@ -562,6 +564,7 @@ internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Me ...@@ -562,6 +564,7 @@ internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Me
{ {
Multiplexer?.OnInfoMessage("reentrant call to WriteMessageTakingWriteLock for " + message.CommandAndKey); Multiplexer?.OnInfoMessage("reentrant call to WriteMessageTakingWriteLock for " + message.CommandAndKey);
} }
physical.SetWriting();
var messageIsSent = false; var messageIsSent = false;
if (message is IMultiMessage) if (message is IMultiMessage)
{ {
...@@ -593,7 +596,8 @@ internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Me ...@@ -593,7 +596,8 @@ internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Me
{ {
result = WriteMessageToServerInsideWriteLock(physical, message); result = WriteMessageToServerInsideWriteLock(physical, message);
} }
result = physical.WakeWriterAndCheckForThrottle(); result = physical.FlushSync();
physical.SetIdle();
} }
finally finally
{ {
......
...@@ -269,17 +269,20 @@ public void Dispose() ...@@ -269,17 +269,20 @@ public void Dispose()
private async Task AwaitedFlush(ValueTask<FlushResult> flush) private async Task AwaitedFlush(ValueTask<FlushResult> flush)
{ {
await flush; await flush;
Interlocked.Exchange(ref lastWriteTickCount, Environment.TickCount); _writeStatus = WriteStatus.Flushed;
UpdateLastWriteTime();
} }
internal void UpdateLastWriteTime() => Interlocked.Exchange(ref lastWriteTickCount, Environment.TickCount);
public Task FlushAsync() public Task FlushAsync()
{ {
var tmp = _ioPipe?.Output; var tmp = _ioPipe?.Output;
if (tmp != null) if (tmp != null)
{ {
_writeStatus = WriteStatus.Flushing;
var flush = tmp.FlushAsync(); var flush = tmp.FlushAsync();
if (!flush.IsCompletedSuccessfully) return AwaitedFlush(flush); if (!flush.IsCompletedSuccessfully) return AwaitedFlush(flush);
Interlocked.Exchange(ref lastWriteTickCount, Environment.TickCount); _writeStatus = WriteStatus.Flushed;
UpdateLastWriteTime();
} }
return Task.CompletedTask; return Task.CompletedTask;
} }
...@@ -342,6 +345,7 @@ public Task FlushAsync() ...@@ -342,6 +345,7 @@ public Task FlushAsync()
if (bridge != null) if (bridge != null)
{ {
exMessage.Append(" on ").Append(Format.ToString(bridge.ServerEndPoint?.EndPoint)).Append("/").Append(connectionType) exMessage.Append(" on ").Append(Format.ToString(bridge.ServerEndPoint?.EndPoint)).Append("/").Append(connectionType)
.Append(", ").Append(_writeStatus)
.Append(", last: ").Append(bridge.LastCommand); .Append(", last: ").Append(bridge.LastCommand);
data.Add(Tuple.Create("FailureType", failureType.ToString())); data.Add(Tuple.Create("FailureType", failureType.ToString()));
...@@ -411,11 +415,21 @@ void add(string lk, string sk, string v) ...@@ -411,11 +415,21 @@ void add(string lk, string sk, string v)
Shutdown(); Shutdown();
} }
public override string ToString() internal void SetIdle() => _writeStatus = WriteStatus.Idle;
internal void SetWriting() => _writeStatus = WriteStatus.Writing;
private WriteStatus _writeStatus;
private enum WriteStatus
{ {
return physicalName; Initializing,
Idle,
Writing,
Flushing,
Flushed,
} }
public override string ToString() => $"{physicalName} ({_writeStatus})";
internal static void IdentifyFailureType(Exception exception, ref ConnectionFailureType failureType) internal static void IdentifyFailureType(Exception exception, ref ConnectionFailureType failureType)
{ {
if (exception != null && failureType == ConnectionFailureType.InternalFailure) if (exception != null && failureType == ConnectionFailureType.InternalFailure)
...@@ -799,15 +813,20 @@ internal static int WriteRaw(Span<byte> span, long value, bool withLengthPrefix ...@@ -799,15 +813,20 @@ internal static int WriteRaw(Span<byte> span, long value, bool withLengthPrefix
return WriteCrlf(span, offset); return WriteCrlf(span, offset);
} }
internal WriteResult WakeWriterAndCheckForThrottle() internal WriteResult FlushSync(bool throwOnFailure = false)
{ {
var tmp = _ioPipe?.Output;
if (tmp == null) return WriteResult.NoConnectionAvailable;
try try
{ {
var flush = _ioPipe.Output.FlushAsync(); _writeStatus = WriteStatus.Flushing;
var flush = tmp.FlushAsync();
if (!flush.IsCompletedSuccessfully) flush.AsTask().Wait(); if (!flush.IsCompletedSuccessfully) flush.AsTask().Wait();
_writeStatus = WriteStatus.Flushed;
UpdateLastWriteTime();
return WriteResult.Success; return WriteResult.Success;
} }
catch (ConnectionResetException ex) catch (ConnectionResetException ex) when (!throwOnFailure)
{ {
RecordConnectionFailed(ConnectionFailureType.SocketClosed, ex); RecordConnectionFailed(ConnectionFailureType.SocketClosed, ex);
return WriteResult.WriteFailure; return WriteResult.WriteFailure;
......
...@@ -274,7 +274,8 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection) ...@@ -274,7 +274,8 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
sb.AppendLine("checking conditions in the *early* path"); sb.AppendLine("checking conditions in the *early* path");
// need to get those sent ASAP; if they are stuck in the buffers, we die // need to get those sent ASAP; if they are stuck in the buffers, we die
multiplexer.Trace("Flushing and waiting for precondition responses"); multiplexer.Trace("Flushing and waiting for precondition responses");
connection.FlushAsync().Wait(); connection.FlushSync(true); // make sure they get sent, so we can check for QUEUED (and the pre-conditions if necessary)
if (Monitor.Wait(lastBox, multiplexer.TimeoutMilliseconds)) if (Monitor.Wait(lastBox, multiplexer.TimeoutMilliseconds))
{ {
if (!AreAllConditionsSatisfied(multiplexer)) if (!AreAllConditionsSatisfied(multiplexer))
...@@ -330,7 +331,7 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection) ...@@ -330,7 +331,7 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
sb.AppendLine("checking conditions in the *late* path"); sb.AppendLine("checking conditions in the *late* path");
multiplexer.Trace("Flushing and waiting for precondition+queued responses"); multiplexer.Trace("Flushing and waiting for precondition+queued responses");
connection.FlushAsync().Wait(); // make sure they get sent, so we can check for QUEUED (and the pre-conditions if necessary) connection.FlushSync(true); // make sure they get sent, so we can check for QUEUED (and the pre-conditions if necessary)
if (Monitor.Wait(lastBox, multiplexer.TimeoutMilliseconds)) if (Monitor.Wait(lastBox, multiplexer.TimeoutMilliseconds))
{ {
if (!AreAllConditionsSatisfied(multiplexer)) if (!AreAllConditionsSatisfied(multiplexer))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment