Commit d29f54fb authored by Pavel Pochobut's avatar Pavel Pochobut

stale connection detection changed to use first unanswered write timestamp...

stale connection detection changed to use first unanswered write timestamp instead of lastWrite - lastRead diff
parent e887b3b3
...@@ -273,11 +273,11 @@ public static bool EmulateStaleConnection ...@@ -273,11 +273,11 @@ public static bool EmulateStaleConnection
} }
} }
partial void DebugEmulateStaleConnection(ref int lastRead) partial void DebugEmulateStaleConnection(ref int firstUnansweredWrite)
{ {
if (emulateStaleConnection) if (emulateStaleConnection)
{ {
lastRead -= 100500; firstUnansweredWrite = Environment.TickCount - 100000;
} }
} }
} }
......
...@@ -64,6 +64,7 @@ private static readonly Message ...@@ -64,6 +64,7 @@ private static readonly Message
int ioBufferBytes = 0; int ioBufferBytes = 0;
int lastWriteTickCount, lastReadTickCount, lastBeatTickCount; int lastWriteTickCount, lastReadTickCount, lastBeatTickCount;
int firstUnansweredWriteTickCount;
private Stream netStream, outStream; private Stream netStream, outStream;
...@@ -85,6 +86,7 @@ public PhysicalConnection(PhysicalBridge bridge) ...@@ -85,6 +86,7 @@ public PhysicalConnection(PhysicalBridge bridge)
public void BeginConnect() public void BeginConnect()
{ {
Thread.VolatileWrite(ref firstUnansweredWriteTickCount, 0);
var endpoint = this.bridge.ServerEndPoint.EndPoint; var endpoint = this.bridge.ServerEndPoint.EndPoint;
multiplexer.Trace("Connecting...", physicalName); multiplexer.Trace("Connecting...", physicalName);
...@@ -166,10 +168,13 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception ...@@ -166,10 +168,13 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception
{ {
int now = Environment.TickCount, lastRead = Thread.VolatileRead(ref lastReadTickCount), lastWrite = Thread.VolatileRead(ref lastWriteTickCount), int now = Environment.TickCount, lastRead = Thread.VolatileRead(ref lastReadTickCount), lastWrite = Thread.VolatileRead(ref lastWriteTickCount),
lastBeat = Thread.VolatileRead(ref lastBeatTickCount); lastBeat = Thread.VolatileRead(ref lastBeatTickCount);
int unansweredRead = Thread.VolatileRead(ref firstUnansweredWriteTickCount);
string message = failureType + " on " + Format.ToString(bridge.ServerEndPoint.EndPoint) + "/" + connectionType string message = failureType + " on " + Format.ToString(bridge.ServerEndPoint.EndPoint) + "/" + connectionType
+ ", input-buffer: " + ioBufferBytes + ", outstanding: " + GetSentAwaitingResponseCount() + ", input-buffer: " + ioBufferBytes + ", outstanding: " + GetSentAwaitingResponseCount()
+ ", last-read: " + unchecked(now - lastRead) / 1000 + "s ago, last-write: " + unchecked(now - lastWrite) / 1000 + "s ago, keep-alive: " + bridge.ServerEndPoint.WriteEverySeconds + "s, pending: " + ", last-read: " + unchecked(now - lastRead) / 1000 + "s ago, last-write: " + unchecked(now - lastWrite) / 1000 + "s ago"
+ ", unanswered-write: " + unchecked(now - unansweredRead) / 1000 + "s ago"
+ ", keep-alive: " + bridge.ServerEndPoint.WriteEverySeconds + "s, pending: "
+ bridge.GetPendingCount() + ", state: " + oldState + ", last-heartbeat: " + (lastBeat == 0 ? "never" : (unchecked(now - lastBeat) / 1000 + "s ago")) + bridge.GetPendingCount() + ", state: " + oldState + ", last-heartbeat: " + (lastBeat == 0 ? "never" : (unchecked(now - lastBeat) / 1000 + "s ago"))
+ (bridge.IsBeating ? " (mid-beat)" : "") + ", last-mbeat: " + multiplexer.LastHeartbeatSecondsAgo + "s ago, global: " + (bridge.IsBeating ? " (mid-beat)" : "") + ", last-mbeat: " + multiplexer.LastHeartbeatSecondsAgo + "s ago, global: "
+ ConnectionMultiplexer.LastGlobalHeartbeatSecondsAgo + "s ago"; + ConnectionMultiplexer.LastGlobalHeartbeatSecondsAgo + "s ago";
...@@ -381,6 +386,10 @@ internal void WriteHeader(RedisCommand command, int arguments) ...@@ -381,6 +386,10 @@ internal void WriteHeader(RedisCommand command, int arguments)
throw ExceptionFactory.CommandDisabled(multiplexer.IncludeDetailInExceptions, command, null, bridge.ServerEndPoint); throw ExceptionFactory.CommandDisabled(multiplexer.IncludeDetailInExceptions, command, null, bridge.ServerEndPoint);
} }
outStream.WriteByte((byte)'*'); outStream.WriteByte((byte)'*');
// remember the time of the first write that still not followed by read
Interlocked.CompareExchange(ref firstUnansweredWriteTickCount, Environment.TickCount, 0);
WriteRaw(outStream, arguments + 1); WriteRaw(outStream, arguments + 1);
WriteUnified(outStream, commandBytes); WriteUnified(outStream, commandBytes);
} }
...@@ -816,6 +825,10 @@ private bool ProcessReadBytes(int bytesRead) ...@@ -816,6 +825,10 @@ private bool ProcessReadBytes(int bytesRead)
} }
Interlocked.Exchange(ref lastReadTickCount, Environment.TickCount); Interlocked.Exchange(ref lastReadTickCount, Environment.TickCount);
// reset unanswered write timestamp
Thread.VolatileWrite(ref firstUnansweredWriteTickCount, 0);
ioBufferBytes += bytesRead; ioBufferBytes += bytesRead;
multiplexer.Trace("More bytes available: " + bytesRead + " (" + ioBufferBytes + ")", physicalName); multiplexer.Trace("More bytes available: " + bytesRead + " (" + ioBufferBytes + ")", physicalName);
int offset = 0, count = ioBufferBytes; int offset = 0, count = ioBufferBytes;
...@@ -954,17 +967,18 @@ RawResult TryParseResult(byte[] buffer, ref int offset, ref int count) ...@@ -954,17 +967,18 @@ RawResult TryParseResult(byte[] buffer, ref int offset, ref int count)
} }
} }
partial void DebugEmulateStaleConnection(ref int lastWrite); partial void DebugEmulateStaleConnection(ref int firstUnansweredWrite);
public void CheckForStaleConnection() public void CheckForStaleConnection()
{ {
int lastRead, lastWrite; int firstUnansweredWrite;
lastRead = Thread.VolatileRead(ref this.lastReadTickCount); firstUnansweredWrite = Thread.VolatileRead(ref firstUnansweredWriteTickCount);
lastWrite = Thread.VolatileRead(ref this.lastWriteTickCount);
DebugEmulateStaleConnection(ref firstUnansweredWrite);
DebugEmulateStaleConnection(ref lastRead); int now = Environment.TickCount;
if ((lastWrite - lastRead) > this.multiplexer.RawConfig.SyncTimeout) if (firstUnansweredWrite != 0 && (now - firstUnansweredWrite) > this.multiplexer.RawConfig.SyncTimeout)
{ {
this.RecordConnectionFailed(ConnectionFailureType.SocketFailure, origin: "CheckForStaleConnection"); this.RecordConnectionFailed(ConnectionFailureType.SocketFailure, origin: "CheckForStaleConnection");
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment