Commit 71763cb4 authored by Marc Gravell's avatar Marc Gravell

Try to detect socket fail in heartbeat

parent 2e77071e
...@@ -37,5 +37,9 @@ public enum ConnectionFailureType ...@@ -37,5 +37,9 @@ public enum ConnectionFailureType
/// The socket was closed /// The socket was closed
/// </summary> /// </summary>
ConnectionDisposed, ConnectionDisposed,
/// <summary>
/// The database is loading and is not available for use
/// </summary>
Loading
} }
} }
...@@ -78,7 +78,7 @@ internal int Count() ...@@ -78,7 +78,7 @@ internal int Count()
} }
} }
internal bool HasWork() internal bool Any()
{ {
lock(regular) lock(regular)
{ {
...@@ -108,13 +108,5 @@ internal void GetStormLog(StringBuilder sb) ...@@ -108,13 +108,5 @@ internal void GetStormLog(StringBuilder sb)
} }
} }
} }
internal bool Any()
{
lock(regular)
{
return high.Count != 0 || regular.Count != 0;
}
}
} }
} }
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.IO;
using System.Net; using System.Net;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Text; using System.Text;
...@@ -173,7 +174,7 @@ internal int GetOutstandingCount(out int inst, out int qu, out int qs, out int q ...@@ -173,7 +174,7 @@ internal int GetOutstandingCount(out int inst, out int qu, out int qs, out int q
inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog)); inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog));
qu = queue.Count(); qu = queue.Count();
var tmp = physical; var tmp = physical;
qs = tmp == null ? 0 : tmp.GetOutstandingCount(); qs = tmp == null ? 0 : tmp.GetSentAwaitingResponseCount();
qc = completionManager.GetOutstandingCount(); qc = completionManager.GetOutstandingCount();
wr = Interlocked.CompareExchange(ref activeWriters, 0, 0); wr = Interlocked.CompareExchange(ref activeWriters, 0, 0);
wq = Interlocked.CompareExchange(ref inWriteQueue, 0, 0); wq = Interlocked.CompareExchange(ref inWriteQueue, 0, 0);
...@@ -378,6 +379,12 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -378,6 +379,12 @@ internal void OnHeartbeat(bool ifConnectedOnly)
State oldState; State oldState;
OnDisconnected(ConnectionFailureType.SocketFailure, tmp, out ignore, out oldState); OnDisconnected(ConnectionFailureType.SocketFailure, tmp, out ignore, out oldState);
} }
} else if(!queue.Any() && tmp.GetSentAwaitingResponseCount() != 0)
{
// there's a chance this is a dead socket; sending data will shake that
// up a bit, so if we have an empty unsent queue and a non-empty sent
// queue, test the socket
KeepAlive();
} }
} }
break; break;
...@@ -695,6 +702,7 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message ...@@ -695,6 +702,7 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message
internal WriteResult WriteQueue(int maxWork) internal WriteResult WriteQueue(int maxWork)
{ {
bool weAreWriter = false; bool weAreWriter = false;
PhysicalConnection conn = null;
try try
{ {
Trace("Writing queue from bridge"); Trace("Writing queue from bridge");
...@@ -706,7 +714,7 @@ internal WriteResult WriteQueue(int maxWork) ...@@ -706,7 +714,7 @@ internal WriteResult WriteQueue(int maxWork)
return WriteResult.CompetingWriter; return WriteResult.CompetingWriter;
} }
var conn = GetConnection(); conn = GetConnection();
if(conn == null) if(conn == null)
{ {
AbortUnsent(); AbortUnsent();
...@@ -745,8 +753,14 @@ internal WriteResult WriteQueue(int maxWork) ...@@ -745,8 +753,14 @@ internal WriteResult WriteQueue(int maxWork)
} }
} }
} }
catch(IOException ex)
{
if (conn != null) conn.RecordConnectionFailed(ConnectionFailureType.SocketFailure, ex);
AbortUnsent();
}
catch(Exception ex) catch(Exception ex)
{ {
AbortUnsent();
OnInternalError(ex); OnInternalError(ex);
} }
finally finally
......
...@@ -166,7 +166,7 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception ...@@ -166,7 +166,7 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception
lastBeat = Interlocked.Read(ref lastBeatTickCount); lastBeat = Interlocked.Read(ref lastBeatTickCount);
string message = failureType + " on " + Format.ToString(bridge.ServerEndPoint.EndPoint) + "/" + connectionType string message = failureType + " on " + Format.ToString(bridge.ServerEndPoint.EndPoint) + "/" + connectionType
+ ", input-buffer: " + ioBufferBytes + ", outstanding: " + GetOutstandingCount() + ", input-buffer: " + ioBufferBytes + ", outstanding: " + GetSentAwaitingResponseCount()
+ ", last-read: " + unchecked(now - lastRead) / 1000 + "s ago, last-write: " + unchecked(now - lastWrite) / 1000 + "s ago, keep-alive: " + bridge.ServerEndPoint.WriteEverySeconds + "s, pending: " + ", last-read: " + unchecked(now - lastRead) / 1000 + "s ago, last-write: " + unchecked(now - lastWrite) / 1000 + "s ago, keep-alive: " + bridge.ServerEndPoint.WriteEverySeconds + "s, pending: "
+ bridge.GetPendingCount() + ", state: " + oldState + ", last-heartbeat: " + (lastBeat == 0 ? "never" : (unchecked(now - lastBeat) / 1000 + "s ago")) + bridge.GetPendingCount() + ", state: " + oldState + ", last-heartbeat: " + (lastBeat == 0 ? "never" : (unchecked(now - lastBeat) / 1000 + "s ago"))
+ (bridge.IsBeating ? " (mid-beat)" : "") + ", last-mbeat: " + multiplexer.LastHeartbeatSecondsAgo + "s ago, global: " + (bridge.IsBeating ? " (mid-beat)" : "") + ", last-mbeat: " + multiplexer.LastHeartbeatSecondsAgo + "s ago, global: "
...@@ -214,8 +214,9 @@ internal static void IdentifyFailureType(Exception exception, ref ConnectionFail ...@@ -214,8 +214,9 @@ internal static void IdentifyFailureType(Exception exception, ref ConnectionFail
if (exception != null && failureType == ConnectionFailureType.InternalFailure) if (exception != null && failureType == ConnectionFailureType.InternalFailure)
{ {
if (exception is AuthenticationException) failureType = ConnectionFailureType.AuthenticationFailure; if (exception is AuthenticationException) failureType = ConnectionFailureType.AuthenticationFailure;
if (exception is EndOfStreamException) failureType = ConnectionFailureType.SocketClosed; else if (exception is SocketException || exception is IOException) failureType = ConnectionFailureType.SocketFailure;
if (exception is SocketException || exception is IOException) failureType = ConnectionFailureType.SocketFailure; else if (exception is EndOfStreamException) failureType = ConnectionFailureType.SocketClosed;
else if (exception is ObjectDisposedException) failureType = ConnectionFailureType.SocketClosed;
} }
} }
...@@ -236,7 +237,7 @@ internal void GetCounters(ConnectionCounters counters) ...@@ -236,7 +237,7 @@ internal void GetCounters(ConnectionCounters counters)
counters.Subscriptions = SubscriptionCount; counters.Subscriptions = SubscriptionCount;
} }
internal int GetOutstandingCount() internal int GetSentAwaitingResponseCount()
{ {
lock (outstanding) lock (outstanding)
{ {
......
...@@ -653,7 +653,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -653,7 +653,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
sealed class EstablishConnectionProcessor : ResultProcessor<bool> sealed class EstablishConnectionProcessor : ResultProcessor<bool>
{ {
static readonly byte[] expected = Encoding.UTF8.GetBytes("PONG"), authFail = Encoding.UTF8.GetBytes("ERR operation not permitted"); static readonly byte[] expected = Encoding.UTF8.GetBytes("PONG"), authFail = Encoding.UTF8.GetBytes("ERR operation not permitted"),
loading = Encoding.UTF8.GetBytes("LOADING ");
public override bool SetResult(PhysicalConnection connection, Message message, RawResult result) public override bool SetResult(PhysicalConnection connection, Message message, RawResult result)
{ {
var final = base.SetResult(connection, message, result); var final = base.SetResult(connection, message, result);
...@@ -662,6 +663,9 @@ public override bool SetResult(PhysicalConnection connection, Message message, R ...@@ -662,6 +663,9 @@ public override bool SetResult(PhysicalConnection connection, Message message, R
if (result.Assert(authFail)) if (result.Assert(authFail))
{ {
connection.RecordConnectionFailed(ConnectionFailureType.AuthenticationFailure); connection.RecordConnectionFailed(ConnectionFailureType.AuthenticationFailure);
} else if(result.AssertStarts(loading))
{
connection.RecordConnectionFailed(ConnectionFailureType.Loading);
} }
else else
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment