Commit 329e5c26 authored by Marc Gravell's avatar Marc Gravell

Better logging during connection init

parent 07c23fbc
...@@ -403,23 +403,23 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options ...@@ -403,23 +403,23 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
internal void LogLocked(TextWriter log, string line) internal void LogLocked(TextWriter log, string line)
{ {
lock (LogSyncLock) { log.WriteLine(line); } if(log != null) lock (LogSyncLock) { log.WriteLine(line); }
} }
internal void LogLocked(TextWriter log, string line, object arg) internal void LogLocked(TextWriter log, string line, object arg)
{ {
lock (LogSyncLock) { log.WriteLine(line, arg); } if (log != null) lock (LogSyncLock) { log.WriteLine(line, arg); }
} }
internal void LogLocked(TextWriter log, string line, object arg0, object arg1) internal void LogLocked(TextWriter log, string line, object arg0, object arg1)
{ {
lock (LogSyncLock) { log.WriteLine(line, arg0, arg1); } if (log != null) lock (LogSyncLock) { log.WriteLine(line, arg0, arg1); }
} }
internal void LogLocked(TextWriter log, string line, object arg0, object arg1, object arg2) internal void LogLocked(TextWriter log, string line, object arg0, object arg1, object arg2)
{ {
lock (LogSyncLock) { log.WriteLine(line, arg0, arg1, arg2); } if (log != null) lock (LogSyncLock) { log.WriteLine(line, arg0, arg1, arg2); }
} }
internal void LogLocked(TextWriter log, string line, params object[] args) internal void LogLocked(TextWriter log, string line, params object[] args)
{ {
lock (LogSyncLock) { log.WriteLine(line, args); } if (log != null) lock (LogSyncLock) { log.WriteLine(line, args); }
} }
internal void CheckMessage(Message message) internal void CheckMessage(Message message)
...@@ -559,7 +559,7 @@ private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout) ...@@ -559,7 +559,7 @@ private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout)
} }
return false; return false;
} }
private static async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilliseconds) private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilliseconds, TextWriter log)
{ {
if (tasks == null) throw new ArgumentNullException("tasks"); if (tasks == null) throw new ArgumentNullException("tasks");
if (tasks.Length == 0) return true; if (tasks.Length == 0) return true;
...@@ -576,7 +576,9 @@ private static async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeo ...@@ -576,7 +576,9 @@ private static async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeo
var allTasks = Task.WhenAll(tasks).ObserveErrors(); var allTasks = Task.WhenAll(tasks).ObserveErrors();
var any = Task.WhenAny(allTasks, Task.Delay(timeoutMilliseconds)).ObserveErrors(); var any = Task.WhenAny(allTasks, Task.Delay(timeoutMilliseconds)).ObserveErrors();
#endif #endif
return await any.ForAwait() == allTasks; bool all = await any.ForAwait() == allTasks;
LogLocked(log, all ? "All tasks completed cleanly" : "Not all tasks completed cleanly");
return all;
} }
catch catch
{ } { }
...@@ -589,7 +591,11 @@ private static async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeo ...@@ -589,7 +591,11 @@ private static async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeo
if (!task.IsCanceled && !task.IsCompleted && !task.IsFaulted) if (!task.IsCanceled && !task.IsCompleted && !task.IsFaulted)
{ {
var remaining = timeoutMilliseconds - checked((int)watch.ElapsedMilliseconds); var remaining = timeoutMilliseconds - checked((int)watch.ElapsedMilliseconds);
if (remaining <= 0) return false; if (remaining <= 0)
{
LogLocked(log, "Timeout awaiting tasks");
return false;
}
try try
{ {
#if NET40 #if NET40
...@@ -603,6 +609,7 @@ private static async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeo ...@@ -603,6 +609,7 @@ private static async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeo
{ } { }
} }
} }
LogLocked(log, "Finished awaiting tasks");
return false; return false;
} }
...@@ -1073,6 +1080,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1073,6 +1080,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
{ {
if (isDisposed) throw new ObjectDisposedException(ToString()); if (isDisposed) throw new ObjectDisposedException(ToString());
bool showStats = true; bool showStats = true;
if (log == null) if (log == null)
{ {
log = TextWriter.Null; log = TextWriter.Null;
...@@ -1167,20 +1175,20 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1167,20 +1175,20 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
// so we know that the configuration will be up to date if we see the tracer // so we know that the configuration will be up to date if we see the tracer
server.AutoConfigure(null); server.AutoConfigure(null);
} }
available[i] = server.SendTracer(); available[i] = server.SendTracer(log);
Message msg;
if (useTieBreakers) if (useTieBreakers)
{ {
LogLocked(log, "Requesting tie-break from {0} > {1}...", Format.ToString(server.EndPoint), configuration.TieBreaker); LogLocked(log, "Requesting tie-break from {0} > {1}...", Format.ToString(server.EndPoint), configuration.TieBreaker);
msg = Message.Create(0, flags, RedisCommand.GET, tieBreakerKey); Message msg = Message.Create(0, flags, RedisCommand.GET, tieBreakerKey);
msg.SetInternalCall(); msg.SetInternalCall();
msg = new LoggingMessage(log, msg);
tieBreakers[i] = server.QueueDirectAsync(msg, ResultProcessor.String); tieBreakers[i] = server.QueueDirectAsync(msg, ResultProcessor.String);
} }
} }
LogLocked(log, "Allowing endpoints {0} to respond...", TimeSpan.FromMilliseconds(configuration.ConnectTimeout)); LogLocked(log, "Allowing endpoints {0} to respond...", TimeSpan.FromMilliseconds(configuration.ConnectTimeout));
Trace("Allowing endpoints " + TimeSpan.FromMilliseconds(configuration.ConnectTimeout) + " to respond..."); Trace("Allowing endpoints " + TimeSpan.FromMilliseconds(configuration.ConnectTimeout) + " to respond...");
await WaitAllIgnoreErrorsAsync(available, configuration.ConnectTimeout).ForAwait(); await WaitAllIgnoreErrorsAsync(available, configuration.ConnectTimeout, log).ForAwait();
List<ServerEndPoint> masters = new List<ServerEndPoint>(available.Length); List<ServerEndPoint> masters = new List<ServerEndPoint>(available.Length);
for (int i = 0; i < available.Length; i++) for (int i = 0; i < available.Length; i++)
...@@ -1381,7 +1389,7 @@ private async Task<ServerEndPoint> NominatePreferredMaster(TextWriter log, Serve ...@@ -1381,7 +1389,7 @@ private async Task<ServerEndPoint> NominatePreferredMaster(TextWriter log, Serve
if (useTieBreakers) if (useTieBreakers)
{ // count the votes { // count the votes
uniques = new Dictionary<string, int>(StringComparer.InvariantCultureIgnoreCase); uniques = new Dictionary<string, int>(StringComparer.InvariantCultureIgnoreCase);
await WaitAllIgnoreErrorsAsync(tieBreakers, 50).ForAwait(); await WaitAllIgnoreErrorsAsync(tieBreakers, 50, log).ForAwait();
for (int i = 0; i < tieBreakers.Length; i++) for (int i = 0; i < tieBreakers.Length; i++)
{ {
var ep = servers[i].EndPoint; var ep = servers[i].EndPoint;
...@@ -1709,7 +1717,7 @@ public async Task CloseAsync(bool allowCommandsToComplete = true) ...@@ -1709,7 +1717,7 @@ public async Task CloseAsync(bool allowCommandsToComplete = true)
if (allowCommandsToComplete) if (allowCommandsToComplete)
{ {
var quits = QuitAllServers(); var quits = QuitAllServers();
await WaitAllIgnoreErrorsAsync(quits, configuration.SyncTimeout).ForAwait(); await WaitAllIgnoreErrorsAsync(quits, configuration.SyncTimeout, null).ForAwait();
} }
DisposeAndClearServers(); DisposeAndClearServers();
......
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO;
using System.Linq; using System.Linq;
using System.Runtime.Serialization; using System.Runtime.Serialization;
using System.Text; using System.Text;
...@@ -79,7 +80,43 @@ public sealed class RedisServerException : RedisException ...@@ -79,7 +80,43 @@ public sealed class RedisServerException : RedisException
internal RedisServerException(string message) : base(message) { } internal RedisServerException(string message) : base(message) { }
} }
sealed class LoggingMessage : Message
{
public readonly TextWriter log;
private readonly Message tail;
public LoggingMessage(TextWriter log, Message tail) : base(tail.Db, tail.Flags, tail.Command)
{
this.log = log;
this.tail = tail;
this.FlagsRaw = tail.FlagsRaw;
}
public override string CommandAndKey
{
get
{
return tail.CommandAndKey;
}
}
public override void AppendStormLog(StringBuilder sb)
{
tail.AppendStormLog(sb);
}
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
return tail.GetHashSlot(serverSelectionStrategy);
}
internal override void WriteImpl(PhysicalConnection physical)
{
try
{
physical.Multiplexer.LogLocked(log, "Writing to {0}: {1}", physical, tail.CommandAndKey);
}
catch { }
tail.WriteImpl(physical);
}
public TextWriter Log { get { return log; } }
}
abstract class Message : ICompletable abstract class Message : ICompletable
{ {
...@@ -100,7 +137,7 @@ abstract class Message : ICompletable ...@@ -100,7 +137,7 @@ abstract class Message : ICompletable
| CommandFlags.HighPriority | CommandFlags.FireAndForget | CommandFlags.NoRedirect; | CommandFlags.HighPriority | CommandFlags.FireAndForget | CommandFlags.NoRedirect;
private CommandFlags flags; private CommandFlags flags;
internal CommandFlags FlagsRaw { get { return flags; } set { flags = value; } }
private ResultBox resultBox; private ResultBox resultBox;
private ResultProcessor resultProcessor; private ResultProcessor resultProcessor;
......
...@@ -126,6 +126,15 @@ public void SetException(Message message, Exception ex) ...@@ -126,6 +126,15 @@ public void SetException(Message message, Exception ex)
// true if ready to be completed (i.e. false if re-issued to another server) // true if ready to be completed (i.e. false if re-issued to another server)
public virtual bool SetResult(PhysicalConnection connection, Message message, RawResult result) public virtual bool SetResult(PhysicalConnection connection, Message message, RawResult result)
{ {
var logging = message as LoggingMessage;
if (logging != null)
{
try
{
connection.Multiplexer.LogLocked(logging.Log, "Response from {0} / {1}: {2}", connection, message.CommandAndKey, result);
}
catch { }
}
if (result.IsError) if (result.IsError)
{ {
var bridge = connection.Bridge; var bridge = connection.Bridge;
......
using System; using System;
using System.Collections; using System.Collections;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO;
using System.Linq; using System.Linq;
using System.Net; using System.Net;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
...@@ -530,7 +531,8 @@ internal Task<T> QueueDirectAsync<T>(Message message, ResultProcessor<T> process ...@@ -530,7 +531,8 @@ internal Task<T> QueueDirectAsync<T>(Message message, ResultProcessor<T> process
var tcs = TaskSource.CreateDenyExecSync<T>(asyncState); var tcs = TaskSource.CreateDenyExecSync<T>(asyncState);
var source = ResultBox<T>.Get(tcs); var source = ResultBox<T>.Get(tcs);
message.SetSource(processor, source); message.SetSource(processor, source);
if(!(bridge ?? GetBridge(message.Command)).TryEnqueue(message, isSlave)) if (bridge == null) bridge = GetBridge(message.Command);
if (!bridge.TryEnqueue(message, isSlave))
{ {
ConnectionMultiplexer.ThrowFailed(tcs, ExceptionFactory.NoConnectionAvailable(multiplexer.IncludeDetailInExceptions, message.Command, message, this)); ConnectionMultiplexer.ThrowFailed(tcs, ExceptionFactory.NoConnectionAvailable(multiplexer.IncludeDetailInExceptions, message.Command, message, this));
} }
...@@ -555,9 +557,11 @@ internal void ReportNextFailure() ...@@ -555,9 +557,11 @@ internal void ReportNextFailure()
if (tmp != null) tmp.ReportNextFailure(); if (tmp != null) tmp.ReportNextFailure();
} }
internal Task<bool> SendTracer() internal Task<bool> SendTracer(TextWriter log = null)
{ {
return QueueDirectAsync(GetTracerMessage(false), ResultProcessor.Tracer); var msg = GetTracerMessage(false);
if (log != null) msg = new LoggingMessage(log, msg);
return QueueDirectAsync(msg, ResultProcessor.Tracer);
} }
internal string Summary() internal string Summary()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment