Commit a92d7ff3 authored by Marc Gravell's avatar Marc Gravell

TIL: Timer can randomly just *stop working* for minutes at a time; added a...

TIL: Timer can randomly just *stop working* for minutes at a time; added a packemaker via the SocketManager
parent e931d6a7
using System; using System;
using System.Threading;
using NUnit.Framework; using NUnit.Framework;
namespace StackExchange.Redis.Tests namespace StackExchange.Redis.Tests
......
...@@ -27,7 +27,7 @@ public void ConnectToSSLServer(int port, string sslHost) ...@@ -27,7 +27,7 @@ public void ConnectToSSLServer(int port, string sslHost)
SslHost = sslHost, SslHost = sslHost,
EndPoints = { { "sslredis", port} }, EndPoints = { { "sslredis", port} },
AllowAdmin = true, AllowAdmin = true,
SyncTimeout = 5000 SyncTimeout = Debugger.IsAttached ? int.MaxValue : 5000
}; };
config.CertificateValidation += (sender, cert, chain, errors) => config.CertificateValidation += (sender, cert, chain, errors) =>
{ {
...@@ -36,6 +36,8 @@ public void ConnectToSSLServer(int port, string sslHost) ...@@ -36,6 +36,8 @@ public void ConnectToSSLServer(int port, string sslHost)
}; };
using (var muxer = ConnectionMultiplexer.Connect(config, Console.Out)) using (var muxer = ConnectionMultiplexer.Connect(config, Console.Out))
{ {
muxer.ConnectionFailed += OnConnectionFailed;
muxer.InternalError += OnInternalError;
var db = muxer.GetDatabase(); var db = muxer.GetDatabase();
db.Ping(); db.Ping();
using (var file = File.Create("ssl" + port + ".zip")) using (var file = File.Create("ssl" + port + ".zip"))
...@@ -64,10 +66,10 @@ public void ConnectToSSLServer(int port, string sslHost) ...@@ -64,10 +66,10 @@ public void ConnectToSSLServer(int port, string sslHost)
// perf: sync/multi-threaded // perf: sync/multi-threaded
TestConcurrent(db, key, 30, 10); TestConcurrent(db, key, 30, 10);
TestConcurrent(db, key, 30, 20); //TestConcurrent(db, key, 30, 20);
TestConcurrent(db, key, 30, 30); //TestConcurrent(db, key, 30, 30);
TestConcurrent(db, key, 30, 40); //TestConcurrent(db, key, 30, 40);
TestConcurrent(db, key, 30, 50); //TestConcurrent(db, key, 30, 50);
} }
} }
......
...@@ -52,14 +52,24 @@ static TestBase() ...@@ -52,14 +52,24 @@ static TestBase()
} }
}; };
} }
private void Muxer_ConnectionFailed(object sender, ConnectionFailedEventArgs e) protected void OnConnectionFailed(object sender, ConnectionFailedEventArgs e)
{ {
Interlocked.Increment(ref failCount); Interlocked.Increment(ref failCount);
lock(exceptions) lock(exceptions)
{ {
exceptions.Add("Connection failed: " + e.EndPoint); exceptions.Add("Connection failed: " + EndPointCollection.ToString(e.EndPoint) + "/" + e.ConnectionType);
} }
} }
protected void OnInternalError(object sender, InternalErrorEventArgs e)
{
Interlocked.Increment(ref failCount);
lock (exceptions)
{
exceptions.Add("Internal error: " + e.Origin + ", " + EndPointCollection.ToString(e.EndPoint) + "/" + e.ConnectionType);
}
}
static int failCount; static int failCount;
volatile int expectedFailCount; volatile int expectedFailCount;
[SetUp] [SetUp]
...@@ -187,7 +197,8 @@ protected IServer GetServer(ConnectionMultiplexer muxer) ...@@ -187,7 +197,8 @@ protected IServer GetServer(ConnectionMultiplexer muxer)
Assert.Inconclusive(failMessage + "Server is not available"); Assert.Inconclusive(failMessage + "Server is not available");
} }
} }
muxer.ConnectionFailed += Muxer_ConnectionFailed; muxer.InternalError += OnInternalError;
muxer.ConnectionFailed += OnConnectionFailed;
return muxer; return muxer;
} }
......
...@@ -64,6 +64,7 @@ ...@@ -64,6 +64,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Compile Include="StackExchange\Redis\Aggregate.cs" /> <Compile Include="StackExchange\Redis\Aggregate.cs" />
<Compile Include="StackExchange\Redis\InternalErrorEventArgs.cs" />
<Compile Include="StackExchange\Redis\RedisChannel.cs" /> <Compile Include="StackExchange\Redis\RedisChannel.cs" />
<Compile Include="StackExchange\Redis\Bitwise.cs" /> <Compile Include="StackExchange\Redis\Bitwise.cs" />
<Compile Include="StackExchange\Redis\ClientFlags.cs" /> <Compile Include="StackExchange\Redis\ClientFlags.cs" />
......
...@@ -262,6 +262,7 @@ internal async Task ResolveEndPointsAsync(ConnectionMultiplexer multiplexer, Tex ...@@ -262,6 +262,7 @@ internal async Task ResolveEndPointsAsync(ConnectionMultiplexer multiplexer, Tex
} }
catch (Exception ex) catch (Exception ex)
{ {
multiplexer.OnInternalError(ex);
multiplexer.LogLocked(log, ex.Message); multiplexer.LogLocked(log, ex.Message);
} }
} }
......
...@@ -7,6 +7,7 @@ namespace StackExchange.Redis ...@@ -7,6 +7,7 @@ namespace StackExchange.Redis
partial class ConnectionMultiplexer partial class ConnectionMultiplexer
{ {
internal SocketManager SocketManager { get { return socketManager; } } internal SocketManager SocketManager { get { return socketManager; } }
private SocketManager socketManager; private SocketManager socketManager;
private bool ownsSocketManager; private bool ownsSocketManager;
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
using System.Linq; using System.Linq;
using System.Net; using System.Net;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Runtime.Remoting.Lifetime;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
...@@ -90,6 +91,25 @@ internal void OnConnectionFailed(EndPoint endpoint, ConnectionType connectionTyp ...@@ -90,6 +91,25 @@ internal void OnConnectionFailed(EndPoint endpoint, ConnectionType connectionTyp
ReconfigureIfNeeded(endpoint, false, "connection failed"); ReconfigureIfNeeded(endpoint, false, "connection failed");
} }
} }
internal void OnInternalError(Exception exception, EndPoint endpoint = null, ConnectionType connectionType = ConnectionType.None, [CallerMemberName] string origin = null)
{
try
{
Trace("Internal error: " + origin + ", " + exception == null ? "unknown" : exception.Message);
if (isDisposed) return;
var handler = InternalError;
if (handler != null)
{
unprocessableCompletionManager.CompleteSyncOrAsync(
new InternalErrorEventArgs(handler, this, endpoint, connectionType, exception, origin)
);
}
}
catch
{ // our internal error event failed; whatcha gonna do, exactly?
}
}
internal void OnConnectionRestored(EndPoint endpoint, ConnectionType connectionType) internal void OnConnectionRestored(EndPoint endpoint, ConnectionType connectionType)
{ {
if (isDisposed) return; if (isDisposed) return;
...@@ -389,6 +409,11 @@ static void WriteNormalizingLineEndings(string source, StreamWriter writer) ...@@ -389,6 +409,11 @@ static void WriteNormalizingLineEndings(string source, StreamWriter writer)
/// </summary> /// </summary>
public event EventHandler<ConnectionFailedEventArgs> ConnectionFailed; public event EventHandler<ConnectionFailedEventArgs> ConnectionFailed;
/// <summary>
/// Raised whenever an internal error occurs (this is primarily for debugging)
/// </summary>
public event EventHandler<InternalErrorEventArgs> InternalError;
/// <summary> /// <summary>
/// Raised whenever a physical connection is established /// Raised whenever a physical connection is established
/// </summary> /// </summary>
...@@ -779,26 +804,48 @@ private ConnectionMultiplexer(ConfigurationOptions configuration) ...@@ -779,26 +804,48 @@ private ConnectionMultiplexer(ConfigurationOptions configuration)
{ {
ConfigurationChangedChannel = Encoding.UTF8.GetBytes(configChannel); ConfigurationChangedChannel = Encoding.UTF8.GetBytes(configChannel);
} }
lastHeartbeatTicks = Environment.TickCount;
} }
partial void OnCreateReaderWriter(ConfigurationOptions configuration); partial void OnCreateReaderWriter(ConfigurationOptions configuration);
internal const int MillisecondsPerHeartbeat = 1000; internal const int MillisecondsPerHeartbeat = 1000;
private static readonly TimerCallback heartbeat = Heartbeat; private static readonly TimerCallback heartbeat = state =>
private static void Heartbeat(object state)
{ {
((ConnectionMultiplexer)state).HeartbeatImpl(); ((ConnectionMultiplexer)state).OnHeartbeat();
} };
private void HeartbeatImpl() private void OnHeartbeat()
{ {
Trace("heartbeat"); try
{
long now = Environment.TickCount;
Interlocked.Exchange(ref lastHeartbeatTicks, now);
Interlocked.Exchange(ref lastGlobalHeartbeatTicks, now);
var lease = pulse == null ? null : pulse.GetLifetimeService() as ILease;
if (lease != null) lease.Renew(TimeSpan.FromMinutes(5));
Trace("heartbeat");
var tmp = serverSnapshot; var tmp = serverSnapshot;
for (int i = 0; i < tmp.Length; i++) for (int i = 0; i < tmp.Length; i++)
tmp[i].OnHeartbeat(); tmp[i].OnHeartbeat();
} catch(Exception ex)
{
OnInternalError(ex);
}
} }
private long lastHeartbeatTicks;
private static long lastGlobalHeartbeatTicks = Environment.TickCount;
internal long LastHeartbeatSecondsAgo {
get {
if (pulse == null) return -1;
return unchecked(Environment.TickCount - Interlocked.Read(ref lastHeartbeatTicks)) / 1000;
}
}
internal static long LastGlobalHeartbeatSecondsAgo
{ get { return unchecked(Environment.TickCount - Interlocked.Read(ref lastGlobalHeartbeatTicks)) / 1000; } }
internal CompletionManager UnprocessableCompletionManager { get { return unprocessableCompletionManager; } } internal CompletionManager UnprocessableCompletionManager { get { return unprocessableCompletionManager; } }
/// <summary> /// <summary>
...@@ -945,10 +992,38 @@ internal int SyncConnectTimeout ...@@ -945,10 +992,38 @@ internal int SyncConnectTimeout
get get
{ {
int timeout = configuration.ConnectTimeout; int timeout = configuration.ConnectTimeout;
if (timeout >= int.MaxValue - 500) return int.MaxValue;
return timeout + Math.Min(500, timeout); return timeout + Math.Min(500, timeout);
} }
} }
/// <summary>
/// Provides a text overview of the status of all connections
/// </summary>
public string GetStatus()
{
using(var sw = new StringWriter())
{
GetStatus(sw);
return sw.ToString();
}
}
/// <summary>
/// Provides a text overview of the status of all connections
/// </summary>
public void GetStatus(TextWriter log)
{
if (log == null) return;
var tmp = serverSnapshot;
foreach (var server in tmp)
{
LogLocked(log, server.Summary());
LogLocked(log, server.GetCounters().ToString());
LogLocked(log, server.GetProfile());
}
LogLocked(log, "Sync timeouts: {0}; fire and forget: {1}; last heartbeat: {2}s ago",
Interlocked.Read(ref syncTimeouts), Interlocked.Read(ref fireAndForgets), LastHeartbeatSecondsAgo);
}
internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, TextWriter log, EndPoint blame, string cause) internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, TextWriter log, EndPoint blame, string cause)
{ {
if (isDisposed) throw new ObjectDisposedException(ToString()); if (isDisposed) throw new ObjectDisposedException(ToString());
...@@ -1024,8 +1099,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1024,8 +1099,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
RedisKey tieBreakerKey = useTieBreakers ? (RedisKey)configuration.TieBreaker : default(RedisKey); RedisKey tieBreakerKey = useTieBreakers ? (RedisKey)configuration.TieBreaker : default(RedisKey);
for (int i = 0; i < available.Length; i++) for (int i = 0; i < available.Length; i++)
{ {
Trace("Testing: " + Format.ToString(endpoints[i]));
Trace("Testing: " + endpoints[i]);
var server = GetServerEndPoint(endpoints[i]); var server = GetServerEndPoint(endpoints[i]);
server.ReportNextFailure(); server.ReportNextFailure();
servers[i] = server; servers[i] = server;
...@@ -1056,7 +1130,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1056,7 +1130,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
for (int i = 0; i < available.Length; i++) for (int i = 0; i < available.Length; i++)
{ {
var task = available[i]; var task = available[i];
Trace(endpoints[i] + ": " + task.Status); Trace(Format.ToString(endpoints[i]) + ": " + task.Status);
if (task.IsFaulted) if (task.IsFaulted)
{ {
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond); servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
...@@ -1158,21 +1232,14 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1158,21 +1232,14 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
} }
if (showStats) if (showStats)
{ {
foreach (var server in servers) GetStatus(log);
{
LogLocked(log, server.Summary());
if (!first)
{
LogLocked(log, server.GetCounters().ToString());
LogLocked(log, server.GetProfile());
}
}
LogLocked(log, "Sync timeouts: {0}; fire and forget: {1}", Interlocked.Read(ref syncTimeouts), Interlocked.Read(ref fireAndForgets));
} }
if (first) if (first)
{ {
LogLocked(log, "Starting heartbeat..."); LogLocked(log, "Starting heartbeat...");
pulse = new Timer(heartbeat, this, MillisecondsPerHeartbeat, MillisecondsPerHeartbeat); pulse = new Timer(heartbeat, this, MillisecondsPerHeartbeat, MillisecondsPerHeartbeat);
ILease lease = pulse.InitializeLifetimeService() as ILease;
if(lease != null) lease.Renew(TimeSpan.FromMinutes(5));
} }
string stormLog = GetStormLog(); string stormLog = GetStormLog();
......
...@@ -5,13 +5,17 @@ ...@@ -5,13 +5,17 @@
/// </summary> /// </summary>
public enum ConnectionType public enum ConnectionType
{ {
/// <summary>
/// Not connection-type related
/// </summary>
None = 0,
/// <summary> /// <summary>
/// An interactive connection handles request/response commands for accessing data on demand /// An interactive connection handles request/response commands for accessing data on demand
/// </summary> /// </summary>
Interactive = 0, Interactive,
/// <summary> /// <summary>
/// A subscriber connection recieves unsolicted messages from the server as pub/sub events occur /// A subscriber connection recieves unsolicted messages from the server as pub/sub events occur
/// </summary> /// </summary>
Subscription = 1 Subscription
} }
} }
using System;
using System.Net;
using System.Text;
namespace StackExchange.Redis
{
/// <summary>
/// Describes internal errors (mainly intended for debugging)
/// </summary>
public class InternalErrorEventArgs : EventArgs, ICompletable
{
private readonly object sender;
private readonly ConnectionType connectionType;
private readonly EndPoint endpoint;
private readonly Exception exception;
private readonly EventHandler<InternalErrorEventArgs> handler;
private readonly string origin;
internal InternalErrorEventArgs(EventHandler<InternalErrorEventArgs> handler, object sender, EndPoint endpoint, ConnectionType connectionType, Exception exception, string origin)
{
this.handler = handler;
this.sender = sender;
this.endpoint = endpoint;
this.connectionType = connectionType;
this.exception = exception;
this.origin = origin;
}
/// <summary>
/// Gets the failing server-endpoint (this can be null)
/// </summary>
public EndPoint EndPoint
{
get { return endpoint; }
}
/// <summary>
/// Gets the connection-type of the failing connection
/// </summary>
public ConnectionType ConnectionType
{
get { return connectionType; }
}
/// <summary>
/// Gets the exception if available (this can be null)
/// </summary>
public Exception Exception
{
get { return exception; }
}
/// <summary>
/// The underlying origin of the error
/// </summary>
public string Origin
{
get { return origin; }
}
bool ICompletable.TryComplete(bool isAsync)
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
void ICompletable.AppendStormLog(StringBuilder sb)
{
sb.Append("event, internal-error: ").Append(origin);
if (endpoint != null) sb.Append(", ").Append(Format.ToString(endpoint));
}
}
}
\ No newline at end of file
...@@ -540,6 +540,7 @@ internal void WriteTo(PhysicalConnection physical) ...@@ -540,6 +540,7 @@ internal void WriteTo(PhysicalConnection physical)
} }
catch (Exception ex) catch (Exception ex)
{ {
if (physical != null) physical.OnInternalError(ex);
Fail(ConnectionFailureType.InternalFailure, ex); Fail(ConnectionFailureType.InternalFailure, ex);
} }
} }
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.Net; using System.Net;
using System.Runtime.CompilerServices;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
...@@ -35,7 +36,7 @@ public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type) ...@@ -35,7 +36,7 @@ public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type)
this.serverEndPoint = serverEndPoint; this.serverEndPoint = serverEndPoint;
this.connectionType = type; this.connectionType = type;
this.multiplexer = serverEndPoint.Multiplexer; this.multiplexer = serverEndPoint.Multiplexer;
this.Name = serverEndPoint.EndPoint.ToString(); this.Name = Format.ToString(serverEndPoint.EndPoint) + "/" + connectionType.ToString();
this.completionManager = new CompletionManager(multiplexer, Name); this.completionManager = new CompletionManager(multiplexer, Name);
} }
...@@ -97,7 +98,7 @@ public void ReportNextFailure() ...@@ -97,7 +98,7 @@ public void ReportNextFailure()
public override string ToString() public override string ToString()
{ {
return connectionType + "/" + serverEndPoint.EndPoint.ToString(); return connectionType + "/" + Format.ToString(serverEndPoint.EndPoint);
} }
public void TryConnect() public void TryConnect()
...@@ -253,7 +254,10 @@ internal void KeepAlive() ...@@ -253,7 +254,10 @@ internal void KeepAlive()
{ {
msg.SetInternalCall(); msg.SetInternalCall();
multiplexer.Trace("Enqueue: " + msg); multiplexer.Trace("Enqueue: " + msg);
TryEnqueue(msg, serverEndPoint.IsSlave); if(!TryEnqueue(msg, serverEndPoint.IsSlave))
{
OnInternalError(ExceptionFactory.NoConnectionAvailable(msg.Command));
}
} }
} }
...@@ -266,7 +270,9 @@ internal void OnConnected(PhysicalConnection connection) ...@@ -266,7 +270,9 @@ internal void OnConnected(PhysicalConnection connection)
} }
else else
{ {
try { connection.Dispose(); } catch { } try {
connection.Dispose();
} catch { }
} }
} }
...@@ -335,7 +341,9 @@ internal int GetPendingCount() ...@@ -335,7 +341,9 @@ internal int GetPendingCount()
{ {
return queue.Count(); return queue.Count();
} }
internal void OnHeartbeat()
internal bool IsBeating { get { return Interlocked.CompareExchange(ref beating, 0, 0) == 1; } }
internal void OnHeartbeat(bool ifConnectedOnly)
{ {
bool runThisTime = false; bool runThisTime = false;
try try
...@@ -355,7 +363,7 @@ internal void OnHeartbeat() ...@@ -355,7 +363,7 @@ internal void OnHeartbeat()
var tmp = physical; var tmp = physical;
if (tmp != null) if (tmp != null)
{ {
tmp.Heartbeat(); tmp.OnHeartbeat();
int writeEverySeconds = serverEndPoint.WriteEverySeconds; int writeEverySeconds = serverEndPoint.WriteEverySeconds;
if (writeEverySeconds > 0 && tmp.LastWriteSecondsAgo >= writeEverySeconds) if (writeEverySeconds > 0 && tmp.LastWriteSecondsAgo >= writeEverySeconds)
{ {
...@@ -374,13 +382,17 @@ internal void OnHeartbeat() ...@@ -374,13 +382,17 @@ internal void OnHeartbeat()
} }
break; break;
case (int)State.Disconnected: case (int)State.Disconnected:
multiplexer.Trace("Resurrecting " + this.ToString()); if (!ifConnectedOnly)
GetConnection(); {
multiplexer.Trace("Resurrecting " + this.ToString());
GetConnection();
}
break; break;
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
OnInternalError(ex);
Trace("OnHeartbeat error: " + ex.Message); Trace("OnHeartbeat error: " + ex.Message);
} }
finally finally
...@@ -389,6 +401,11 @@ internal void OnHeartbeat() ...@@ -389,6 +401,11 @@ internal void OnHeartbeat()
} }
} }
private void OnInternalError(Exception exception, [CallerMemberName] string origin = null)
{
multiplexer.OnInternalError(exception, serverEndPoint.EndPoint, connectionType, origin);
}
internal void RemovePhysical(PhysicalConnection connection) internal void RemovePhysical(PhysicalConnection connection)
{ {
Interlocked.CompareExchange(ref physical, null, connection); Interlocked.CompareExchange(ref physical, null, connection);
...@@ -507,8 +524,10 @@ private void Flush() ...@@ -507,8 +524,10 @@ private void Flush()
Trace(connectionType + " flushed"); Trace(connectionType + " flushed");
tmp.Flush(); tmp.Flush();
} }
catch catch(Exception ex)
{ } {
OnInternalError(ex);
}
} }
} }
...@@ -533,6 +552,7 @@ private PhysicalConnection GetConnection() ...@@ -533,6 +552,7 @@ private PhysicalConnection GetConnection()
{ {
Multiplexer.Trace("Connect failed: " + ex.Message, Name); Multiplexer.Trace("Connect failed: " + ex.Message, Name);
ChangeState(State.Disconnected); ChangeState(State.Disconnected);
OnInternalError(ex);
throw; throw;
} }
} }
...@@ -625,7 +645,7 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message ...@@ -625,7 +645,7 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message
CompleteSyncOrAsync(message); CompleteSyncOrAsync(message);
// this failed without actually writing; we're OK with that... unless there's a transaction // this failed without actually writing; we're OK with that... unless there's a transaction
if (connection.TransactionActive) if (connection != null && connection.TransactionActive)
{ {
// we left it in a broken state; need to kill the connection // we left it in a broken state; need to kill the connection
connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure, ex); connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure, ex);
...@@ -640,7 +660,7 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message ...@@ -640,7 +660,7 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message
CompleteSyncOrAsync(message); CompleteSyncOrAsync(message);
// we're not sure *what* happened here; kill the connection // we're not sure *what* happened here; kill the connection
connection.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex); if(connection != null) connection.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
return false; return false;
} }
} }
...@@ -693,8 +713,10 @@ internal WriteResult WriteQueue(int maxWork) ...@@ -693,8 +713,10 @@ internal WriteResult WriteQueue(int maxWork)
} }
} }
} }
catch catch(Exception ex)
{ } {
OnInternalError(ex);
}
finally finally
{ {
if (weAreWriter) if (weAreWriter)
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
using System.Net; using System.Net;
using System.Net.Security; using System.Net.Security;
using System.Net.Sockets; using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Security.Authentication; using System.Security.Authentication;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
...@@ -78,7 +79,7 @@ public PhysicalConnection(PhysicalBridge bridge) ...@@ -78,7 +79,7 @@ public PhysicalConnection(PhysicalBridge bridge)
this.ChannelPrefix = multiplexer.RawConfig.ChannelPrefix; this.ChannelPrefix = multiplexer.RawConfig.ChannelPrefix;
if (this.ChannelPrefix != null && this.ChannelPrefix.Length == 0) this.ChannelPrefix = null; // null tests are easier than null+empty if (this.ChannelPrefix != null && this.ChannelPrefix.Length == 0) this.ChannelPrefix = null; // null tests are easier than null+empty
var endpoint = bridge.ServerEndPoint.EndPoint; var endpoint = bridge.ServerEndPoint.EndPoint;
physicalName = connectionType + "#" + Interlocked.Increment(ref totalCount) + "@" + endpoint.ToString(); physicalName = connectionType + "#" + Interlocked.Increment(ref totalCount) + "@" + Format.ToString(endpoint);
this.bridge = bridge; this.bridge = bridge;
multiplexer.Trace("Connecting...", physicalName); multiplexer.Trace("Connecting...", physicalName);
...@@ -141,14 +142,16 @@ public void Flush() ...@@ -141,14 +142,16 @@ public void Flush()
} }
int failureReported; int failureReported;
internal void Heartbeat() internal void OnHeartbeat()
{ {
Interlocked.Exchange(ref lastBeatTickCount, Environment.TickCount); Interlocked.Exchange(ref lastBeatTickCount, Environment.TickCount);
} }
public void RecordConnectionFailed(ConnectionFailureType failureType, Exception innerException = null) public void RecordConnectionFailed(ConnectionFailureType failureType, Exception innerException = null, [CallerMemberName] string origin = null)
{ {
IdentifyFailureType(innerException, ref failureType); IdentifyFailureType(innerException, ref failureType);
if(failureType == ConnectionFailureType.InternalFailure) OnInternalError(innerException, origin);
// stop anything new coming in... // stop anything new coming in...
bridge.Trace("Failed: " + failureType); bridge.Trace("Failed: " + failureType);
bool isCurrent; bool isCurrent;
...@@ -165,7 +168,9 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception ...@@ -165,7 +168,9 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception
string message = failureType + " on " + Format.ToString(bridge.ServerEndPoint.EndPoint) + "/" + connectionType string message = failureType + " on " + Format.ToString(bridge.ServerEndPoint.EndPoint) + "/" + connectionType
+ ", input-buffer: " + ioBufferBytes + ", outstanding: " + GetOutstandingCount() + ", input-buffer: " + ioBufferBytes + ", outstanding: " + GetOutstandingCount()
+ ", last-read: " + unchecked(now - lastRead) / 1000 + "s ago, last-write: " + unchecked(now - lastWrite) / 1000 + "s ago, keep-alive: " + bridge.ServerEndPoint.WriteEverySeconds + "s, pending: " + ", last-read: " + unchecked(now - lastRead) / 1000 + "s ago, last-write: " + unchecked(now - lastWrite) / 1000 + "s ago, keep-alive: " + bridge.ServerEndPoint.WriteEverySeconds + "s, pending: "
+ bridge.GetPendingCount() + ", state: " + oldState + ", last-heartbeat: " + (lastBeat == 0 ? "never" : (unchecked(now - lastBeat) / 1000 + "s ago")); + bridge.GetPendingCount() + ", state: " + oldState + ", last-heartbeat: " + (lastBeat == 0 ? "never" : (unchecked(now - lastBeat) / 1000 + "s ago"))
+ (bridge.IsBeating ? " (mid-beat)" : "") + ", last-mbeat: " + multiplexer.LastHeartbeatSecondsAgo + "s ago, global: "
+ ConnectionMultiplexer.LastGlobalHeartbeatSecondsAgo + "s ago";
var ex = innerException == null var ex = innerException == null
? new RedisConnectionException(failureType, message) ? new RedisConnectionException(failureType, message)
...@@ -478,21 +483,22 @@ void ISocketCallback.Connected(Stream stream) ...@@ -478,21 +483,22 @@ void ISocketCallback.Connected(Stream stream)
if (!string.IsNullOrWhiteSpace(config.SslHost)) if (!string.IsNullOrWhiteSpace(config.SslHost))
{ {
var ssl = new SslStream(netStream, false, config.CertificateValidationCallback, config.CertificateSelectionCallback, EncryptionPolicy.RequireEncryption); var ssl = new SslStream(stream, false, config.CertificateValidationCallback, config.CertificateSelectionCallback, EncryptionPolicy.RequireEncryption);
ssl.AuthenticateAsClient(config.SslHost); ssl.AuthenticateAsClient(config.SslHost);
stream = ssl; stream = ssl;
} }
OnWrapForLogging(ref stream, physicalName); OnWrapForLogging(ref stream, physicalName);
this.netStream = stream;
int bufferSize = config.WriteBuffer; int bufferSize = config.WriteBuffer;
outStream = bufferSize <= 0 ? netStream : new BufferedStream(stream, bufferSize); this.netStream = stream;
this.outStream = bufferSize <= 0 ? stream : new BufferedStream(stream, bufferSize);
multiplexer.Trace("Connected", physicalName); multiplexer.Trace("Connected", physicalName);
bridge.OnConnected(this); bridge.OnConnected(this);
} }
catch (Exception ex) catch (Exception ex)
{ {
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex); // includes a bridge.OnDisconnected RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex); // includes a bridge.OnDisconnected
multiplexer.Trace("Could not connect: " + ex.Message, physicalName); multiplexer.Trace("Could not connect: " + ex.Message, physicalName);
} }
...@@ -529,7 +535,7 @@ void MatchResult(RawResult result) ...@@ -529,7 +535,7 @@ void MatchResult(RawResult result)
blame = Format.TryParseEndPoint(items[2].GetString()); blame = Format.TryParseEndPoint(items[2].GetString());
} }
} }
catch { } catch { /* no biggie */ }
multiplexer.Trace("Configuration changed: " + Format.ToString(blame), physicalName); multiplexer.Trace("Configuration changed: " + Format.ToString(blame), physicalName);
multiplexer.ReconfigureIfNeeded(blame, true, "broadcast"); multiplexer.ReconfigureIfNeeded(blame, true, "broadcast");
} }
...@@ -572,6 +578,11 @@ void MatchResult(RawResult result) ...@@ -572,6 +578,11 @@ void MatchResult(RawResult result)
} }
} }
internal void OnInternalError(Exception exception, [CallerMemberName] string origin = null)
{
multiplexer.OnInternalError(exception, bridge.ServerEndPoint.EndPoint, connectionType, origin);
}
partial void OnCloseEcho(); partial void OnCloseEcho();
partial void OnCreateEcho(); partial void OnCreateEcho();
...@@ -600,6 +611,18 @@ private int ProcessBuffer(byte[] underlying, ref int offset, ref int count) ...@@ -600,6 +611,18 @@ private int ProcessBuffer(byte[] underlying, ref int offset, ref int count)
return messageCount; return messageCount;
} }
void ISocketCallback.OnHeartbeat()
{
try
{
bridge.OnHeartbeat(true); // all the fun code is here
}
catch (Exception ex)
{
OnInternalError(ex);
}
}
void ISocketCallback.Read() void ISocketCallback.Read()
{ {
try try
...@@ -635,6 +658,7 @@ void ISocketCallback.Read() ...@@ -635,6 +658,7 @@ void ISocketCallback.Read()
ioBufferBytes = count; ioBufferBytes = count;
} }
} while (socketToken.Available != 0); } while (socketToken.Available != 0);
multiplexer.Trace("Buffer exhausted", physicalName);
// ^^^ note that the socket manager will call us again when there is something to do // ^^^ note that the socket manager will call us again when there is something to do
} }
catch (Exception ex) catch (Exception ex)
......
...@@ -10,7 +10,7 @@ public abstract class RedisResult ...@@ -10,7 +10,7 @@ public abstract class RedisResult
// internally, this is very similar to RawResult, except it is designed to be usable // internally, this is very similar to RawResult, except it is designed to be usable
// outside of the IO-processing pipeline: the buffers are standalone, etc // outside of the IO-processing pipeline: the buffers are standalone, etc
internal static RedisResult TryCreate(RawResult result) internal static RedisResult TryCreate(PhysicalConnection connection, RawResult result)
{ {
try try
{ {
...@@ -25,7 +25,7 @@ internal static RedisResult TryCreate(RawResult result) ...@@ -25,7 +25,7 @@ internal static RedisResult TryCreate(RawResult result)
var arr = new RedisResult[items.Length]; var arr = new RedisResult[items.Length];
for (int i = 0; i < arr.Length; i++) for (int i = 0; i < arr.Length; i++)
{ {
var next = TryCreate(items[i]); var next = TryCreate(connection, items[i]);
if (next == null) return null; // means we didn't understand if (next == null) return null; // means we didn't understand
arr[i] = next; arr[i] = next;
} }
...@@ -35,8 +35,9 @@ internal static RedisResult TryCreate(RawResult result) ...@@ -35,8 +35,9 @@ internal static RedisResult TryCreate(RawResult result)
default: default:
return null; return null;
} }
} catch } catch(Exception ex)
{ {
if(connection != null) connection.OnInternalError(ex);
return null; // will be logged as a protocol fail by the processor return null; // will be logged as a protocol fail by the processor
} }
} }
......
...@@ -1060,7 +1060,7 @@ public override bool SetResult(PhysicalConnection connection, Message message, R ...@@ -1060,7 +1060,7 @@ public override bool SetResult(PhysicalConnection connection, Message message, R
// (is that a thing?) will be wrapped in the RedisResult // (is that a thing?) will be wrapped in the RedisResult
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result) protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{ {
var value = Redis.RedisResult.TryCreate(result); var value = Redis.RedisResult.TryCreate(connection, result);
if(value != null) if(value != null)
{ {
SetResult(message, value); SetResult(message, value);
......
...@@ -186,7 +186,7 @@ public void SetUnselectable(UnselectableFlags flags) ...@@ -186,7 +186,7 @@ public void SetUnselectable(UnselectableFlags flags)
} }
public override string ToString() public override string ToString()
{ {
return EndPoint.ToString(); return Format.ToString(EndPoint);
} }
public bool TryEnqueue(Message message) public bool TryEnqueue(Message message)
...@@ -321,10 +321,16 @@ internal void OnFullyEstablished(PhysicalConnection connection) ...@@ -321,10 +321,16 @@ internal void OnFullyEstablished(PhysicalConnection connection)
internal void OnHeartbeat() internal void OnHeartbeat()
{ {
var tmp = interactive; try
if (tmp != null) tmp.OnHeartbeat(); {
tmp = subscription; var tmp = interactive;
if (tmp != null) tmp.OnHeartbeat(); if (tmp != null) tmp.OnHeartbeat(false);
tmp = subscription;
if (tmp != null) tmp.OnHeartbeat(false);
} catch(Exception ex)
{
multiplexer.OnInternalError(ex, EndPoint);
}
} }
......
...@@ -75,7 +75,6 @@ private void AddRead(Socket socket, ISocketCallback callback) ...@@ -75,7 +75,6 @@ private void AddRead(Socket socket, ISocketCallback callback)
if (Interlocked.CompareExchange(ref readerCount, 0, 0) == 0) if (Interlocked.CompareExchange(ref readerCount, 0, 0) == 0)
StartReader(); StartReader();
} }
} }
} }
...@@ -134,10 +133,33 @@ private void ReadImpl() ...@@ -134,10 +133,33 @@ private void ReadImpl()
{ {
List<IntPtr> dead = null, active = new List<IntPtr>(); List<IntPtr> dead = null, active = new List<IntPtr>();
IntPtr[] readSockets = EmptyPointers, errorSockets = EmptyPointers; IntPtr[] readSockets = EmptyPointers, errorSockets = EmptyPointers;
long lastHeartbeat = Environment.TickCount;
SocketPair[] allSocketPairs = null;
while (true) while (true)
{ {
active.Clear(); active.Clear();
if (dead != null) dead.Clear(); if (dead != null) dead.Clear();
// this check is actually a pace-maker; sometimes the Timer callback stalls for
// extended periods of time, which can cause socket disconnect
long now = Environment.TickCount;
if (unchecked(now - lastHeartbeat) >= 15000)
{
lastHeartbeat = now;
lock(socketLookup)
{
if(allSocketPairs == null || allSocketPairs.Length != socketLookup.Count)
allSocketPairs = new SocketPair[socketLookup.Count];
socketLookup.Values.CopyTo(allSocketPairs, 0);
}
foreach(var pair in allSocketPairs)
{
var callback = pair.Callback;
if (callback != null) callback.OnHeartbeat();
}
}
lock (socketLookup) lock (socketLookup)
{ {
if (isDisposed) return; if (isDisposed) return;
...@@ -186,7 +208,7 @@ private void ReadImpl() ...@@ -186,7 +208,7 @@ private void ReadImpl()
int ready; int ready;
try try
{ {
var timeout = new TimeValue(100); var timeout = new TimeValue(1000);
ready = select(0, readSockets, null, errorSockets, ref timeout); ready = select(0, readSockets, null, errorSockets, ref timeout);
if (ready <= 0) if (ready <= 0)
{ {
...@@ -460,9 +482,9 @@ private void EndConnect(IAsyncResult ar) ...@@ -460,9 +482,9 @@ private void EndConnect(IAsyncResult ar)
var socket = tuple.Item1; var socket = tuple.Item1;
var callback = tuple.Item2; var callback = tuple.Item2;
socket.EndConnect(ar); socket.EndConnect(ar);
AddRead(socket, callback);
var netStream = new NetworkStream(socket, false); var netStream = new NetworkStream(socket, false);
callback.Connected(netStream); callback.Connected(netStream);
AddRead(socket, callback);
} }
catch catch
{ {
...@@ -494,6 +516,7 @@ internal interface ISocketCallback ...@@ -494,6 +516,7 @@ internal interface ISocketCallback
/// Indicates that the socket has signalled an error condition /// Indicates that the socket has signalled an error condition
/// </summary> /// </summary>
void Error(); void Error();
void OnHeartbeat();
} }
internal struct SocketToken internal struct SocketToken
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment