Commit 583cf5f4 authored by Marc Gravell's avatar Marc Gravell

More logging during init

parent ba0229ef
...@@ -847,7 +847,7 @@ internal ServerEndPoint GetServerEndPoint(EndPoint endpoint) ...@@ -847,7 +847,7 @@ internal ServerEndPoint GetServerEndPoint(EndPoint endpoint)
{ {
if (isDisposed) throw new ObjectDisposedException(ToString()); if (isDisposed) throw new ObjectDisposedException(ToString());
server = new ServerEndPoint(this, endpoint); server = new ServerEndPoint(this, endpoint, null);
servers.Add(endpoint, server); servers.Add(endpoint, server);
var newSnapshot = serverSnapshot; var newSnapshot = serverSnapshot;
...@@ -1163,7 +1163,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1163,7 +1163,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
serverSnapshot = new ServerEndPoint[configuration.EndPoints.Count]; serverSnapshot = new ServerEndPoint[configuration.EndPoints.Count];
foreach (var endpoint in configuration.EndPoints) foreach (var endpoint in configuration.EndPoints)
{ {
var server = new ServerEndPoint(this, endpoint); var server = new ServerEndPoint(this, endpoint, log);
serverSnapshot[index++] = server; serverSnapshot[index++] = server;
this.servers.Add(endpoint, server); this.servers.Add(endpoint, server);
} }
...@@ -1173,7 +1173,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1173,7 +1173,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
server.Activate(ConnectionType.Interactive, log); server.Activate(ConnectionType.Interactive, log);
if (this.CommandMap.IsAvailable(RedisCommand.SUBSCRIBE)) if (this.CommandMap.IsAvailable(RedisCommand.SUBSCRIBE))
{ {
server.Activate(ConnectionType.Subscription, log); server.Activate(ConnectionType.Subscription, null); // no need to log the SUB stuff
} }
} }
} }
...@@ -1220,7 +1220,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1220,7 +1220,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
LogLocked(log, "Requesting tie-break from {0} > {1}...", Format.ToString(server.EndPoint), configuration.TieBreaker); LogLocked(log, "Requesting tie-break from {0} > {1}...", Format.ToString(server.EndPoint), configuration.TieBreaker);
Message msg = Message.Create(0, flags, RedisCommand.GET, tieBreakerKey); Message msg = Message.Create(0, flags, RedisCommand.GET, tieBreakerKey);
msg.SetInternalCall(); msg.SetInternalCall();
msg = new LoggingMessage(log, msg); msg = LoggingMessage.Create(log, msg);
tieBreakers[i] = server.QueueDirectAsync(msg, ResultProcessor.String); tieBreakers[i] = server.QueueDirectAsync(msg, ResultProcessor.String);
} }
} }
......
...@@ -84,7 +84,12 @@ sealed class LoggingMessage : Message ...@@ -84,7 +84,12 @@ sealed class LoggingMessage : Message
{ {
public readonly TextWriter log; public readonly TextWriter log;
private readonly Message tail; private readonly Message tail;
public LoggingMessage(TextWriter log, Message tail) : base(tail.Db, tail.Flags, tail.Command)
public static Message Create(TextWriter log, Message tail)
{
return log == null ? tail : new LoggingMessage(log, tail);
}
private LoggingMessage(TextWriter log, Message tail) : base(tail.Db, tail.Flags, tail.Command)
{ {
this.log = log; this.log = log;
this.tail = tail; this.tail = tail;
...@@ -109,7 +114,7 @@ internal override void WriteImpl(PhysicalConnection physical) ...@@ -109,7 +114,7 @@ internal override void WriteImpl(PhysicalConnection physical)
{ {
try try
{ {
physical.Multiplexer.LogLocked(log, "Writing to {0}: {1}", physical, tail.CommandAndKey); physical.Multiplexer.LogLocked(log, "Writing to {0}: {1}", physical.Bridge, tail.CommandAndKey);
} }
catch { } catch { }
tail.WriteImpl(physical); tail.WriteImpl(physical);
......
...@@ -283,12 +283,12 @@ internal void KeepAlive() ...@@ -283,12 +283,12 @@ internal void KeepAlive()
} }
} }
internal void OnConnected(PhysicalConnection connection) internal void OnConnected(PhysicalConnection connection, TextWriter log)
{ {
Trace("OnConnected"); Trace("OnConnected");
if (physical == connection && !isDisposed && ChangeState(State.Connecting, State.ConnectedEstablishing)) if (physical == connection && !isDisposed && ChangeState(State.Connecting, State.ConnectedEstablishing))
{ {
serverEndPoint.OnEstablishing(connection); serverEndPoint.OnEstablishing(connection, log);
} }
else else
{ {
......
...@@ -648,7 +648,7 @@ static LocalCertificateSelectionCallback GetAmbientCertificateCallback() ...@@ -648,7 +648,7 @@ static LocalCertificateSelectionCallback GetAmbientCertificateCallback()
{ } { }
return null; return null;
} }
SocketMode ISocketCallback.Connected(Stream stream) SocketMode ISocketCallback.Connected(Stream stream, TextWriter log)
{ {
try try
{ {
...@@ -663,6 +663,7 @@ SocketMode ISocketCallback.Connected(Stream stream) ...@@ -663,6 +663,7 @@ SocketMode ISocketCallback.Connected(Stream stream)
if(config.Ssl) if(config.Ssl)
{ {
multiplexer.LogLocked(log, "Configuring SSL");
var host = config.SslHost; var host = config.SslHost;
if (string.IsNullOrWhiteSpace(host)) host = Format.ToStringHostOnly(bridge.ServerEndPoint.EndPoint); if (string.IsNullOrWhiteSpace(host)) host = Format.ToStringHostOnly(bridge.ServerEndPoint.EndPoint);
...@@ -687,9 +688,9 @@ SocketMode ISocketCallback.Connected(Stream stream) ...@@ -687,9 +688,9 @@ SocketMode ISocketCallback.Connected(Stream stream)
int bufferSize = config.WriteBuffer; int bufferSize = config.WriteBuffer;
this.netStream = stream; this.netStream = stream;
this.outStream = bufferSize <= 0 ? stream : new BufferedStream(stream, bufferSize); this.outStream = bufferSize <= 0 ? stream : new BufferedStream(stream, bufferSize);
multiplexer.Trace("Connected", physicalName); multiplexer.LogLocked(log, "Connected {0}", bridge);
bridge.OnConnected(this); bridge.OnConnected(this, log);
return socketMode; return socketMode;
} }
catch (Exception ex) catch (Exception ex)
......
...@@ -131,7 +131,7 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, Ra ...@@ -131,7 +131,7 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, Ra
{ {
try try
{ {
connection.Multiplexer.LogLocked(logging.Log, "Response from {0} / {1}: {2}", connection, message.CommandAndKey, result); connection.Multiplexer.LogLocked(logging.Log, "Response from {0} / {1}: {2}", connection.Bridge, message.CommandAndKey, result);
} }
catch { } catch { }
} }
......
...@@ -56,7 +56,7 @@ internal void ResetNonConnected() ...@@ -56,7 +56,7 @@ internal void ResetNonConnected()
tmp = subscription; tmp = subscription;
if (tmp != null) tmp.ResetNonConnected(); if (tmp != null) tmp.ResetNonConnected();
} }
public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint) public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint, TextWriter log)
{ {
this.multiplexer = multiplexer; this.multiplexer = multiplexer;
this.endpoint = endpoint; this.endpoint = endpoint;
...@@ -66,7 +66,7 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint) ...@@ -66,7 +66,7 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint)
isSlave = false; isSlave = false;
databases = 0; databases = 0;
writeEverySeconds = config.KeepAlive > 0 ? config.KeepAlive : 60; writeEverySeconds = config.KeepAlive > 0 ? config.KeepAlive : 60;
interactive = CreateBridge(ConnectionType.Interactive, null); interactive = CreateBridge(ConnectionType.Interactive, log);
serverType = ServerType.Standalone; serverType = ServerType.Standalone;
// overrides for twemproxy // overrides for twemproxy
...@@ -450,12 +450,12 @@ internal bool IsSelectable(RedisCommand command) ...@@ -450,12 +450,12 @@ internal bool IsSelectable(RedisCommand command)
return bridge != null && bridge.IsConnected; return bridge != null && bridge.IsConnected;
} }
internal void OnEstablishing(PhysicalConnection connection) internal void OnEstablishing(PhysicalConnection connection, TextWriter log)
{ {
try try
{ {
if (connection == null) return; if (connection == null) return;
Handshake(connection); Handshake(connection, log);
} }
catch (Exception ex) catch (Exception ex)
{ {
...@@ -560,7 +560,7 @@ internal void ReportNextFailure() ...@@ -560,7 +560,7 @@ internal void ReportNextFailure()
internal Task<bool> SendTracer(TextWriter log = null) internal Task<bool> SendTracer(TextWriter log = null)
{ {
var msg = GetTracerMessage(false); var msg = GetTracerMessage(false);
if (log != null) msg = new LoggingMessage(log, msg); msg = LoggingMessage.Create(log, msg);
return QueueDirectAsync(msg, ResultProcessor.Tracer); return QueueDirectAsync(msg, ResultProcessor.Tracer);
} }
...@@ -621,9 +621,9 @@ private PhysicalBridge CreateBridge(ConnectionType type, TextWriter log) ...@@ -621,9 +621,9 @@ private PhysicalBridge CreateBridge(ConnectionType type, TextWriter log)
bridge.TryConnect(log); bridge.TryConnect(log);
return bridge; return bridge;
} }
void Handshake(PhysicalConnection connection) void Handshake(PhysicalConnection connection, TextWriter log)
{ {
multiplexer.Trace("Server handshake"); multiplexer.LogLocked(log, "Server handshake");
if (connection == null) if (connection == null)
{ {
multiplexer.Trace("No connection!?"); multiplexer.Trace("No connection!?");
...@@ -633,7 +633,7 @@ void Handshake(PhysicalConnection connection) ...@@ -633,7 +633,7 @@ void Handshake(PhysicalConnection connection)
string password = multiplexer.RawConfig.Password; string password = multiplexer.RawConfig.Password;
if (!string.IsNullOrWhiteSpace(password)) if (!string.IsNullOrWhiteSpace(password))
{ {
multiplexer.Trace("Sending password"); multiplexer.LogLocked(log, "Authenticating (password)");
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.AUTH, (RedisValue)password); msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.AUTH, (RedisValue)password);
msg.SetInternalCall(); msg.SetInternalCall();
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.DemandOK); WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.DemandOK);
...@@ -646,7 +646,7 @@ void Handshake(PhysicalConnection connection) ...@@ -646,7 +646,7 @@ void Handshake(PhysicalConnection connection)
name = nameSanitizer.Replace(name, ""); name = nameSanitizer.Replace(name, "");
if (!string.IsNullOrWhiteSpace(name)) if (!string.IsNullOrWhiteSpace(name))
{ {
multiplexer.Trace("Setting client name: " + name); multiplexer.LogLocked(log, "Setting client name: {0}", name);
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.CLIENT, RedisLiterals.SETNAME, (RedisValue)name); msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.CLIENT, RedisLiterals.SETNAME, (RedisValue)name);
msg.SetInternalCall(); msg.SetInternalCall();
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.DemandOK); WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.DemandOK);
...@@ -658,11 +658,13 @@ void Handshake(PhysicalConnection connection) ...@@ -658,11 +658,13 @@ void Handshake(PhysicalConnection connection)
if (connType == ConnectionType.Interactive) if (connType == ConnectionType.Interactive)
{ {
multiplexer.Trace("Auto-configure..."); multiplexer.LogLocked(log, "Auto-configure...");
AutoConfigure(connection); AutoConfigure(connection);
} }
multiplexer.Trace("Sending critical tracer"); multiplexer.LogLocked(log, "Sending critical tracer: {0}", connection.Bridge);
WriteDirectOrQueueFireAndForget(connection, GetTracerMessage(true), ResultProcessor.EstablishConnection); var tracer = GetTracerMessage(true);
tracer = LoggingMessage.Create(log, tracer);
WriteDirectOrQueueFireAndForget(connection, tracer, ResultProcessor.EstablishConnection);
// note: this **must** be the last thing on the subscription handshake, because after this // note: this **must** be the last thing on the subscription handshake, because after this
...@@ -676,7 +678,7 @@ void Handshake(PhysicalConnection connection) ...@@ -676,7 +678,7 @@ void Handshake(PhysicalConnection connection)
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.TrackSubscriptions); WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.TrackSubscriptions);
} }
} }
multiplexer.LogLocked(log, "Flushing outbound buffer");
connection.Flush(); connection.Flush();
} }
......
...@@ -22,7 +22,7 @@ internal partial interface ISocketCallback ...@@ -22,7 +22,7 @@ internal partial interface ISocketCallback
/// <summary> /// <summary>
/// Indicates that a socket has connected /// Indicates that a socket has connected
/// </summary> /// </summary>
SocketMode Connected(Stream stream); SocketMode Connected(Stream stream, TextWriter log);
/// <summary> /// <summary>
/// Indicates that the socket has signalled an error condition /// Indicates that the socket has signalled an error condition
/// </summary> /// </summary>
...@@ -144,7 +144,7 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, C ...@@ -144,7 +144,7 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, C
(ar) => (ar) =>
{ {
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint); multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(ar); EndConnectImpl(ar, multiplexer, log);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint); multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
}, },
connectCompletionType); connectCompletionType);
...@@ -158,7 +158,7 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, C ...@@ -158,7 +158,7 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, C
}, },
(ar) => { (ar) => {
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint); multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(ar); EndConnectImpl(ar, multiplexer, log);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint); multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
}, },
connectCompletionType); connectCompletionType);
...@@ -219,7 +219,7 @@ internal void Shutdown(SocketToken token) ...@@ -219,7 +219,7 @@ internal void Shutdown(SocketToken token)
Shutdown(token.Socket); Shutdown(token.Socket);
} }
private void EndConnectImpl(IAsyncResult ar) private void EndConnectImpl(IAsyncResult ar, ConnectionMultiplexer multiplexer, TextWriter log)
{ {
Tuple<Socket, ISocketCallback> tuple = null; Tuple<Socket, ISocketCallback> tuple = null;
try try
...@@ -232,15 +232,15 @@ private void EndConnectImpl(IAsyncResult ar) ...@@ -232,15 +232,15 @@ private void EndConnectImpl(IAsyncResult ar)
var callback = tuple.Item2; var callback = tuple.Item2;
socket.EndConnect(ar); socket.EndConnect(ar);
var netStream = new NetworkStream(socket, false); var netStream = new NetworkStream(socket, false);
var socketMode = callback == null ? SocketMode.Abort : callback.Connected(netStream); var socketMode = callback == null ? SocketMode.Abort : callback.Connected(netStream, log);
switch (socketMode) switch (socketMode)
{ {
case SocketMode.Poll: case SocketMode.Poll:
ConnectionMultiplexer.TraceWithoutContext("Starting poll"); multiplexer.LogLocked(log, "Starting poll");
OnAddRead(socket, callback); OnAddRead(socket, callback);
break; break;
case SocketMode.Async: case SocketMode.Async:
ConnectionMultiplexer.TraceWithoutContext("Starting read"); multiplexer.LogLocked(log, "Starting read");
try try
{ callback.StartReading(); } { callback.StartReading(); }
catch (Exception ex) catch (Exception ex)
...@@ -257,7 +257,7 @@ private void EndConnectImpl(IAsyncResult ar) ...@@ -257,7 +257,7 @@ private void EndConnectImpl(IAsyncResult ar)
} }
catch(ObjectDisposedException) catch(ObjectDisposedException)
{ {
ConnectionMultiplexer.TraceWithoutContext("(socket shutdown)"); multiplexer.LogLocked(log, "(socket shutdown)");
if (tuple != null) if (tuple != null)
{ {
try try
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment