Commit 8a3f264a authored by Marc Gravell's avatar Marc Gravell

don't be so aggressive about adding PINGs if we're already backed up; no need...

don't be so aggressive about adding PINGs if we're already backed up; no need to report QUIT as faulted if the socket died (that's always a race)
parent 78d04c28
...@@ -619,6 +619,8 @@ internal void SetException(Exception exception) ...@@ -619,6 +619,8 @@ internal void SetException(Exception exception)
resultBox?.SetException(exception); resultBox?.SetException(exception);
} }
internal bool TrySetResult<T>(T value) => resultBox is ResultBox<T> typed && typed.TrySetResult(value);
internal void SetEnqueued() => performance?.SetEnqueued(); internal void SetEnqueued() => performance?.SetEnqueued();
internal void SetRequestSent() internal void SetRequestSent()
......
...@@ -260,6 +260,7 @@ internal void KeepAlive() ...@@ -260,6 +260,7 @@ internal void KeepAlive()
} }
break; break;
} }
if (msg != null) if (msg != null)
{ {
msg.SetInternalCall(); msg.SetInternalCall();
...@@ -460,7 +461,9 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -460,7 +461,9 @@ internal void OnHeartbeat(bool ifConnectedOnly)
OnDisconnected(ConnectionFailureType.SocketFailure, tmp, out bool ignore, out State oldState); OnDisconnected(ConnectionFailureType.SocketFailure, tmp, out bool ignore, out State oldState);
} }
} }
else if (tmp.GetSentAwaitingResponseCount() != 0) else if (writeEverySeconds <= 0 && tmp.IsIdle()
&& tmp.LastWriteSecondsAgo > 2
&& tmp.GetSentAwaitingResponseCount() != 0)
{ {
// there's a chance this is a dead socket; sending data will shake that // there's a chance this is a dead socket; sending data will shake that
// up a bit, so if we have an empty unsent queue and a non-empty sent // up a bit, so if we have an empty unsent queue and a non-empty sent
......
...@@ -399,6 +399,13 @@ void add(string lk, string sk, string v) ...@@ -399,6 +399,13 @@ void add(string lk, string sk, string v)
while (_writtenAwaitingResponse.Count != 0) while (_writtenAwaitingResponse.Count != 0)
{ {
var next = _writtenAwaitingResponse.Dequeue(); var next = _writtenAwaitingResponse.Dequeue();
if (next.Command == RedisCommand.QUIT && next.TrySetResult(true))
{
// fine, death of a socket is close enough
}
else
{
var ex = innerException is RedisException ? innerException : outerException; var ex = innerException is RedisException ? innerException : outerException;
if (bridge != null) if (bridge != null)
{ {
...@@ -406,6 +413,7 @@ void add(string lk, string sk, string v) ...@@ -406,6 +413,7 @@ void add(string lk, string sk, string v)
bridge.Multiplexer?.OnMessageFaulted(next, ex, origin); bridge.Multiplexer?.OnMessageFaulted(next, ex, origin);
} }
next.SetException(ex); next.SetException(ex);
}
bridge.CompleteSyncOrAsync(next); bridge.CompleteSyncOrAsync(next);
} }
} }
......
...@@ -82,6 +82,13 @@ public void SetResult(T value) ...@@ -82,6 +82,13 @@ public void SetResult(T value)
this.value = value; this.value = value;
} }
internal bool TrySetResult(T value)
{
if (_exception != null) return false;
this.value = value;
return true;
}
public override bool IsAsync => stateOrCompletionSource is TaskCompletionSource<T>; public override bool IsAsync => stateOrCompletionSource is TaskCompletionSource<T>;
public override bool TryComplete(bool isAsync) public override bool TryComplete(bool isAsync)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment