Commit deb8b4f4 authored by Marc Gravell's avatar Marc Gravell

More logging during connect

parent 329e5c26
...@@ -562,7 +562,13 @@ private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout) ...@@ -562,7 +562,13 @@ private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout)
private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilliseconds, TextWriter log) private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilliseconds, TextWriter log)
{ {
if (tasks == null) throw new ArgumentNullException("tasks"); if (tasks == null) throw new ArgumentNullException("tasks");
if (tasks.Length == 0) return true; if (tasks.Length == 0)
{
LogLocked(log, "No tasks to await");
return true;
}
LogLocked(log, "Awaiting task completion...");
var watch = Stopwatch.StartNew(); var watch = Stopwatch.StartNew();
try try
...@@ -1131,10 +1137,10 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1131,10 +1137,10 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
} }
foreach (var server in serverSnapshot) foreach (var server in serverSnapshot)
{ {
server.Activate(ConnectionType.Interactive); server.Activate(ConnectionType.Interactive, log);
if (this.CommandMap.IsAvailable(RedisCommand.SUBSCRIBE)) if (this.CommandMap.IsAvailable(RedisCommand.SUBSCRIBE))
{ {
server.Activate(ConnectionType.Subscription); server.Activate(ConnectionType.Subscription, log);
} }
} }
} }
......
...@@ -122,9 +122,9 @@ public override string ToString() ...@@ -122,9 +122,9 @@ public override string ToString()
return connectionType + "/" + Format.ToString(serverEndPoint.EndPoint); return connectionType + "/" + Format.ToString(serverEndPoint.EndPoint);
} }
public void TryConnect() public void TryConnect(TextWriter log)
{ {
GetConnection(); GetConnection(log);
} }
public bool TryEnqueue(Message message, bool isSlave) public bool TryEnqueue(Message message, bool isSlave)
...@@ -309,7 +309,7 @@ internal void ResetNonConnected() ...@@ -309,7 +309,7 @@ internal void ResetNonConnected()
{ {
tmp.RecordConnectionFailed(ConnectionFailureType.UnableToConnect); tmp.RecordConnectionFailed(ConnectionFailureType.UnableToConnect);
} }
GetConnection(); GetConnection(null);
} }
internal void OnConnectionFailed(PhysicalConnection connection, ConnectionFailureType failureType, Exception innerException) internal void OnConnectionFailed(PhysicalConnection connection, ConnectionFailureType failureType, Exception innerException)
...@@ -345,7 +345,7 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti ...@@ -345,7 +345,7 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti
if (!isDisposed && Interlocked.Increment(ref failConnectCount) == 1) if (!isDisposed && Interlocked.Increment(ref failConnectCount) == 1)
{ {
GetConnection(); // try to connect immediately GetConnection(null); // try to connect immediately
} }
} }
else if (physical == null) else if (physical == null)
...@@ -402,7 +402,7 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -402,7 +402,7 @@ internal void OnHeartbeat(bool ifConnectedOnly)
State oldState; State oldState;
OnDisconnected(ConnectionFailureType.UnableToConnect, snapshot, out isCurrent, out oldState); OnDisconnected(ConnectionFailureType.UnableToConnect, snapshot, out isCurrent, out oldState);
using (snapshot) { } // dispose etc using (snapshot) { } // dispose etc
TryConnect(); TryConnect(null);
} }
if (!ifConnectedOnly) if (!ifConnectedOnly)
{ {
...@@ -456,7 +456,7 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -456,7 +456,7 @@ internal void OnHeartbeat(bool ifConnectedOnly)
{ {
AbortUnsent(); AbortUnsent();
multiplexer.Trace("Resurrecting " + this.ToString()); multiplexer.Trace("Resurrecting " + this.ToString());
GetConnection(); GetConnection(null);
} }
break; break;
default: default:
...@@ -571,7 +571,7 @@ internal WriteResult WriteQueue(int maxWork) ...@@ -571,7 +571,7 @@ internal WriteResult WriteQueue(int maxWork)
return WriteResult.CompetingWriter; return WriteResult.CompetingWriter;
} }
conn = GetConnection(); conn = GetConnection(null);
if (conn == null) if (conn == null)
{ {
AbortUnsent(); AbortUnsent();
...@@ -679,7 +679,7 @@ private bool ChangeState(State oldState, State newState) ...@@ -679,7 +679,7 @@ private bool ChangeState(State oldState, State newState)
return result; return result;
} }
private PhysicalConnection GetConnection() private PhysicalConnection GetConnection(TextWriter log)
{ {
if (state == (int)State.Disconnected) if (state == (int)State.Disconnected)
{ {
...@@ -687,6 +687,7 @@ private PhysicalConnection GetConnection() ...@@ -687,6 +687,7 @@ private PhysicalConnection GetConnection()
{ {
if (!multiplexer.IsDisposed) if (!multiplexer.IsDisposed)
{ {
Multiplexer.LogLocked(log, "Connecting {0}...", Name);
Multiplexer.Trace("Connecting...", Name); Multiplexer.Trace("Connecting...", Name);
if (ChangeState(State.Disconnected, State.Connecting)) if (ChangeState(State.Disconnected, State.Connecting))
{ {
...@@ -695,13 +696,14 @@ private PhysicalConnection GetConnection() ...@@ -695,13 +696,14 @@ private PhysicalConnection GetConnection()
// separate creation and connection for case when connection completes synchronously // separate creation and connection for case when connection completes synchronously
// in that case PhysicalConnection will call back to PhysicalBridge, and most of PhysicalBridge methods assumes that physical is not null; // in that case PhysicalConnection will call back to PhysicalBridge, and most of PhysicalBridge methods assumes that physical is not null;
physical = new PhysicalConnection(this); physical = new PhysicalConnection(this);
physical.BeginConnect(); physical.BeginConnect(log);
} }
} }
return null; return null;
} }
catch (Exception ex) catch (Exception ex)
{ {
Multiplexer.LogLocked(log, "Connect {0} failed: {1}", Name, ex.Message);
Multiplexer.Trace("Connect failed: " + ex.Message, Name); Multiplexer.Trace("Connect failed: " + ex.Message, Name);
ChangeState(State.Disconnected); ChangeState(State.Disconnected);
OnInternalError(ex); OnInternalError(ex);
......
...@@ -91,13 +91,13 @@ public PhysicalConnection(PhysicalBridge bridge) ...@@ -91,13 +91,13 @@ public PhysicalConnection(PhysicalBridge bridge)
OnCreateEcho(); OnCreateEcho();
} }
public void BeginConnect() public void BeginConnect(TextWriter log)
{ {
Thread.VolatileWrite(ref firstUnansweredWriteTickCount, 0); Thread.VolatileWrite(ref firstUnansweredWriteTickCount, 0);
var endpoint = this.bridge.ServerEndPoint.EndPoint; var endpoint = this.bridge.ServerEndPoint.EndPoint;
multiplexer.Trace("Connecting...", physicalName); multiplexer.Trace("Connecting...", physicalName);
this.socketToken = multiplexer.SocketManager.BeginConnect(endpoint, this); this.socketToken = multiplexer.SocketManager.BeginConnect(endpoint, this, multiplexer, log);
} }
private enum ReadMode : byte private enum ReadMode : byte
......
...@@ -66,7 +66,7 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint) ...@@ -66,7 +66,7 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint)
isSlave = false; isSlave = false;
databases = 0; databases = 0;
writeEverySeconds = config.KeepAlive > 0 ? config.KeepAlive : 60; writeEverySeconds = config.KeepAlive > 0 ? config.KeepAlive : 60;
interactive = CreateBridge(ConnectionType.Interactive); interactive = CreateBridge(ConnectionType.Interactive, null);
serverType = ServerType.Standalone; serverType = ServerType.Standalone;
// overrides for twemproxy // overrides for twemproxy
...@@ -147,15 +147,15 @@ public void Dispose() ...@@ -147,15 +147,15 @@ public void Dispose()
if (tmp != null) tmp.Dispose(); if (tmp != null) tmp.Dispose();
} }
public PhysicalBridge GetBridge(ConnectionType type, bool create = true) public PhysicalBridge GetBridge(ConnectionType type, bool create = true, TextWriter log = null)
{ {
if (isDisposed) return null; if (isDisposed) return null;
switch (type) switch (type)
{ {
case ConnectionType.Interactive: case ConnectionType.Interactive:
return interactive ?? (create ? interactive = CreateBridge(ConnectionType.Interactive) : null); return interactive ?? (create ? interactive = CreateBridge(ConnectionType.Interactive, log) : null);
case ConnectionType.Subscription: case ConnectionType.Subscription:
return subscription ?? (create ? subscription = CreateBridge(ConnectionType.Subscription) : null); return subscription ?? (create ? subscription = CreateBridge(ConnectionType.Subscription, log) : null);
} }
return null; return null;
} }
...@@ -168,7 +168,7 @@ public PhysicalBridge GetBridge(RedisCommand command, bool create = true) ...@@ -168,7 +168,7 @@ public PhysicalBridge GetBridge(RedisCommand command, bool create = true)
case RedisCommand.UNSUBSCRIBE: case RedisCommand.UNSUBSCRIBE:
case RedisCommand.PSUBSCRIBE: case RedisCommand.PSUBSCRIBE:
case RedisCommand.PUNSUBSCRIBE: case RedisCommand.PUNSUBSCRIBE:
return subscription ?? (create ? subscription = CreateBridge(ConnectionType.Subscription) : null); return subscription ?? (create ? subscription = CreateBridge(ConnectionType.Subscription, null) : null);
default: default:
return interactive; return interactive;
} }
...@@ -236,9 +236,9 @@ public bool TryEnqueue(Message message) ...@@ -236,9 +236,9 @@ public bool TryEnqueue(Message message)
return bridge != null && bridge.TryEnqueue(message, isSlave); return bridge != null && bridge.TryEnqueue(message, isSlave);
} }
internal void Activate(ConnectionType type) internal void Activate(ConnectionType type, TextWriter log)
{ {
GetBridge(type, true); GetBridge(type, true, log);
} }
internal void AddScript(string script, byte[] hash) internal void AddScript(string script, byte[] hash)
...@@ -614,11 +614,11 @@ internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection, ...@@ -614,11 +614,11 @@ internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection,
} }
} }
private PhysicalBridge CreateBridge(ConnectionType type) private PhysicalBridge CreateBridge(ConnectionType type, TextWriter log)
{ {
multiplexer.Trace(type.ToString()); multiplexer.Trace(type.ToString());
var bridge = new PhysicalBridge(this, type); var bridge = new PhysicalBridge(this, type);
bridge.TryConnect(); bridge.TryConnect(log);
return bridge; return bridge;
} }
void Handshake(PhysicalConnection connection) void Handshake(PhysicalConnection connection)
......
...@@ -121,7 +121,7 @@ public void Dispose() ...@@ -121,7 +121,7 @@ public void Dispose()
OnDispose(); OnDispose();
} }
internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback) internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, ConnectionMultiplexer multiplexer, TextWriter log)
{ {
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
SetFastLoopbackOption(socket); SetFastLoopbackOption(socket);
...@@ -131,21 +131,37 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback) ...@@ -131,21 +131,37 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback)
CompletionType connectCompletionType = CompletionType.Any; CompletionType connectCompletionType = CompletionType.Any;
this.ShouldForceConnectCompletionType(ref connectCompletionType); this.ShouldForceConnectCompletionType(ref connectCompletionType);
var formattedEndpoint = Format.ToString(endpoint);
if (endpoint is DnsEndPoint) if (endpoint is DnsEndPoint)
{ {
// A work-around for a Mono bug in BeginConnect(EndPoint endpoint, AsyncCallback callback, object state) // A work-around for a Mono bug in BeginConnect(EndPoint endpoint, AsyncCallback callback, object state)
DnsEndPoint dnsEndpoint = (DnsEndPoint)endpoint; DnsEndPoint dnsEndpoint = (DnsEndPoint)endpoint;
CompletionTypeHelper.RunWithCompletionType( CompletionTypeHelper.RunWithCompletionType(
(cb) => socket.BeginConnect(dnsEndpoint.Host, dnsEndpoint.Port, cb, Tuple.Create(socket, callback)), (cb) =>
(ar) => EndConnectImpl(ar), {
multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
return socket.BeginConnect(dnsEndpoint.Host, dnsEndpoint.Port, cb, Tuple.Create(socket, callback));
},
(ar) =>
{
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(ar);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
},
connectCompletionType); connectCompletionType);
} }
else else
{ {
CompletionTypeHelper.RunWithCompletionType( CompletionTypeHelper.RunWithCompletionType(
(cb) => socket.BeginConnect(endpoint, cb, Tuple.Create(socket, callback)), (cb) => {
(ar) => EndConnectImpl(ar), multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
return socket.BeginConnect(endpoint, cb, Tuple.Create(socket, callback));
},
(ar) => {
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(ar);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
},
connectCompletionType); connectCompletionType);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment