Unverified Commit bb981525 authored by Marc Gravell's avatar Marc Gravell Committed by GitHub

fix #1108 - introduce LogProxy as an intermediary between the TextWriter; move...

fix #1108 - introduce LogProxy as an intermediary between the TextWriter; move the sync to there - allows safe detach from the logging (#1116)
parent 93ee0fb6
......@@ -9,6 +9,7 @@
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading.Tasks;
using static StackExchange.Redis.ConnectionMultiplexer;
namespace StackExchange.Redis
{
......@@ -523,7 +524,7 @@ internal bool HasDnsEndPoints()
return false;
}
internal async Task ResolveEndPointsAsync(ConnectionMultiplexer multiplexer, TextWriter log)
internal async Task ResolveEndPointsAsync(ConnectionMultiplexer multiplexer, LogProxy log)
{
var cache = new Dictionary<string, IPAddress>(StringComparer.OrdinalIgnoreCase);
for (int i = 0; i < EndPoints.Count; i++)
......@@ -542,12 +543,12 @@ internal async Task ResolveEndPointsAsync(ConnectionMultiplexer multiplexer, Tex
}
else
{
multiplexer.LogLocked(log, "Using DNS to resolve '{0}'...", dns.Host);
log?.WriteLine($"Using DNS to resolve '{dns.Host}'...");
var ips = await Dns.GetHostAddressesAsync(dns.Host).ObserveErrors().ForAwait();
if (ips.Length == 1)
{
ip = ips[0];
multiplexer.LogLocked(log, "'{0}' => {1}", dns.Host, ip);
log?.WriteLine($"'{dns.Host}' => {ip}");
cache[dns.Host] = ip;
EndPoints[i] = new IPEndPoint(ip, dns.Port);
}
......@@ -556,7 +557,7 @@ internal async Task ResolveEndPointsAsync(ConnectionMultiplexer multiplexer, Tex
catch (Exception ex)
{
multiplexer.OnInternalError(ex);
multiplexer.LogLocked(log, ex.Message);
log?.WriteLine(ex.Message);
}
}
}
......
......@@ -329,7 +329,7 @@ public void ExportConfiguration(Stream destination, ExportOptions options = Expo
}
}
internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options, TextWriter log)
internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options, LogProxy log)
{
CommandMap.AssertAvailable(RedisCommand.SLAVEOF);
if (!RawConfig.AllowAdmin) throw ExceptionFactory.AdminModeNotEnabled(IncludeDetailInExceptions, RedisCommand.SLAVEOF, null, server);
......@@ -338,21 +338,20 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
var srv = new RedisServer(this, server, null);
if (!srv.IsConnected) throw ExceptionFactory.NoConnectionAvailable(IncludeDetailInExceptions, IncludePerformanceCountersInExceptions, RedisCommand.SLAVEOF, null, server, GetServerSnapshot());
if (log == null) log = TextWriter.Null;
CommandMap.AssertAvailable(RedisCommand.SLAVEOF);
#pragma warning disable CS0618
const CommandFlags flags = CommandFlags.NoRedirect | CommandFlags.HighPriority;
#pragma warning restore CS0618
Message msg;
LogLocked(log, "Checking {0} is available...", Format.ToString(srv.EndPoint));
log?.WriteLine($"Checking {Format.ToString(srv.EndPoint)} is available...");
try
{
srv.Ping(flags); // if it isn't happy, we're not happy
}
catch (Exception ex)
{
LogLocked(log, "Operation failed on {0}, aborting: {1}", Format.ToString(srv.EndPoint), ex.Message);
log?.WriteLine($"Operation failed on {Format.ToString(srv.EndPoint)}, aborting: {ex.Message}");
throw;
}
......@@ -369,7 +368,7 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
foreach (var node in nodes)
{
if (!node.IsConnected) continue;
LogLocked(log, "Attempting to set tie-breaker on {0}...", Format.ToString(node.EndPoint));
log?.WriteLine($"Attempting to set tie-breaker on {Format.ToString(node.EndPoint)}...");
msg = Message.Create(0, flags, RedisCommand.SET, tieBreakerKey, newMaster);
#pragma warning disable CS0618
node.WriteDirectFireAndForgetSync(msg, ResultProcessor.DemandOK);
......@@ -378,21 +377,21 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
}
// deslave...
LogLocked(log, "Making {0} a master...", Format.ToString(srv.EndPoint));
log?.WriteLine($"Making {Format.ToString(srv.EndPoint)} a master...");
try
{
srv.SlaveOf(null, flags);
}
catch (Exception ex)
{
LogLocked(log, "Operation failed on {0}, aborting: {1}", Format.ToString(srv.EndPoint), ex.Message);
log?.WriteLine($"Operation failed on {Format.ToString(srv.EndPoint)}, aborting: {ex.Message}");
throw;
}
// also, in case it was a slave a moment ago, and hasn't got the tie-breaker yet, we re-send the tie-breaker to this one
if (!tieBreakerKey.IsNull)
{
LogLocked(log, "Resending tie-breaker to {0}...", Format.ToString(server.EndPoint));
log?.WriteLine($"Resending tie-breaker to {Format.ToString(server.EndPoint)}...");
msg = Message.Create(0, flags, RedisCommand.SET, tieBreakerKey, newMaster);
#pragma warning disable CS0618
server.WriteDirectFireAndForgetSync(msg, ResultProcessor.DemandOK);
......@@ -414,7 +413,7 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
foreach (var node in nodes)
{
if (!node.IsConnected) continue;
LogLocked(log, "Broadcasting via {0}...", Format.ToString(node.EndPoint));
log?.WriteLine($"Broadcasting via {Format.ToString(node.EndPoint)}...");
msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, newMaster);
#pragma warning disable CS0618
node.WriteDirectFireAndForgetSync(msg, ResultProcessor.Int64);
......@@ -428,7 +427,7 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
{
if (node == server || node.ServerType != ServerType.Standalone) continue;
LogLocked(log, "Enslaving {0}...", Format.ToString(node.EndPoint));
log?.WriteLine($"Enslaving {Format.ToString(node.EndPoint)}...");
msg = RedisServer.CreateSlaveOfMessage(server.EndPoint, flags);
#pragma warning disable CS0618
node.WriteDirectFireAndForgetSync(msg, ResultProcessor.DemandOK);
......@@ -437,7 +436,7 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
}
// and reconfigure the muxer
LogLocked(log, "Reconfiguring all endpoints...");
log?.WriteLine("Reconfiguring all endpoints...");
// Yes, there is a tiny latency race possible between this code and the next call, but it's far more minute than before.
// The effective gap between 0 and > 0 (likely off-box) latency is something that may never get hit here by anyone.
if (blockingReconfig)
......@@ -446,42 +445,10 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
}
if (!ReconfigureAsync(false, true, log, srv.EndPoint, "make master").ObserveErrors().Wait(5000))
{
LogLocked(log, "Verifying the configuration was incomplete; please verify");
log?.WriteLine("Verifying the configuration was incomplete; please verify");
}
}
/// <summary>
/// Used internally to synchronize loggine without depending on locking the log instance
/// </summary>
private object LogSyncLock => UniqueId;
// we know this has strong identity: readonly and unique to us
internal void LogLocked(TextWriter log, string line)
{
if (log != null) lock (LogSyncLock) { log.WriteLine(line); }
}
internal void LogLocked(TextWriter log, string line, object arg)
{
if (log != null) lock (LogSyncLock) { log.WriteLine(line, arg); }
}
internal void LogLocked(TextWriter log, string line, object arg0, object arg1)
{
if (log != null) lock (LogSyncLock) { log.WriteLine(line, arg0, arg1); }
}
internal void LogLocked(TextWriter log, string line, object arg0, object arg1, object arg2)
{
if (log != null) lock (LogSyncLock) { log.WriteLine(line, arg0, arg1, arg2); }
}
internal void LogLocked(TextWriter log, string line, params object[] args)
{
if (log != null) lock (LogSyncLock) { log.WriteLine(line, args); }
}
internal void CheckMessage(Message message)
{
if (!RawConfig.AllowAdmin && message.IsAdmin)
......@@ -650,7 +617,7 @@ private static bool WaitAllIgnoreErrors(Task[] tasks, int timeout)
internal bool AuthSuspect { get; private set; }
internal void SetAuthSuspect() => AuthSuspect = true;
private void LogLockedWithThreadPoolStats(TextWriter log, string message, out int busyWorkerCount)
private static void LogWithThreadPoolStats(LogProxy log, string message, out int busyWorkerCount)
{
busyWorkerCount = 0;
if (log != null)
......@@ -659,7 +626,7 @@ private void LogLockedWithThreadPoolStats(TextWriter log, string message, out in
sb.Append(message);
busyWorkerCount = PerfCounterHelper.GetThreadPoolStats(out string iocp, out string worker);
sb.Append(", IOCP: ").Append(iocp).Append(", WORKER: ").Append(worker);
LogLocked(log, sb.ToString());
log?.WriteLine(sb.ToString());
}
}
......@@ -674,36 +641,36 @@ private static bool AllComplete(Task[] tasks)
return true;
}
private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilliseconds, TextWriter log, [CallerMemberName] string caller = null, [CallerLineNumber] int callerLineNumber = 0)
private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilliseconds, LogProxy log, [CallerMemberName] string caller = null, [CallerLineNumber] int callerLineNumber = 0)
{
if (tasks == null) throw new ArgumentNullException(nameof(tasks));
if (tasks.Length == 0)
{
LogLocked(log, "No tasks to await");
log?.WriteLine("No tasks to await");
return true;
}
if (AllComplete(tasks))
{
LogLocked(log, "All tasks are already complete");
log?.WriteLine("All tasks are already complete");
return true;
}
var watch = Stopwatch.StartNew();
LogLockedWithThreadPoolStats(log, "Awaiting task completion", out _);
LogWithThreadPoolStats(log, "Awaiting task completion", out _);
try
{
// if none error, great
var remaining = timeoutMilliseconds - checked((int)watch.ElapsedMilliseconds);
if (remaining <= 0)
{
LogLockedWithThreadPoolStats(log, "Timeout before awaiting for tasks", out _);
LogWithThreadPoolStats(log, "Timeout before awaiting for tasks", out _);
return false;
}
var allTasks = Task.WhenAll(tasks).ObserveErrors();
bool all = await allTasks.TimeoutAfter(timeoutMs: remaining).ObserveErrors().ForAwait();
LogLockedWithThreadPoolStats(log, all ? "All tasks completed cleanly" : $"Not all tasks completed cleanly (from {caller}#{callerLineNumber}, timeout {timeoutMilliseconds}ms)", out _);
LogWithThreadPoolStats(log, all ? "All tasks completed cleanly" : $"Not all tasks completed cleanly (from {caller}#{callerLineNumber}, timeout {timeoutMilliseconds}ms)", out _);
return all;
}
catch
......@@ -719,7 +686,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
var remaining = timeoutMilliseconds - checked((int)watch.ElapsedMilliseconds);
if (remaining <= 0)
{
LogLockedWithThreadPoolStats(log, "Timeout awaiting tasks", out _);
LogWithThreadPoolStats(log, "Timeout awaiting tasks", out _);
return false;
}
try
......@@ -730,7 +697,7 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
{ }
}
}
LogLockedWithThreadPoolStats(log, "Finished awaiting tasks", out _);
LogWithThreadPoolStats(log, "Finished awaiting tasks", out _);
return false;
}
......@@ -813,22 +780,25 @@ private static async Task<ConnectionMultiplexer> ConnectImplAsync(object configu
IDisposable killMe = null;
EventHandler<ConnectionFailedEventArgs> connectHandler = null;
ConnectionMultiplexer muxer = null;
try
using (var logProxy = LogProxy.TryCreate(log))
{
muxer = CreateMultiplexer(configuration, log, out connectHandler);
killMe = muxer;
bool configured = await muxer.ReconfigureAsync(true, false, log, null, "connect").ObserveErrors().ForAwait();
if (!configured)
try
{
throw ExceptionFactory.UnableToConnect(muxer, muxer.failureMessage);
muxer = CreateMultiplexer(configuration, logProxy, out connectHandler);
killMe = muxer;
bool configured = await muxer.ReconfigureAsync(true, false, logProxy, null, "connect").ObserveErrors().ForAwait();
if (!configured)
{
throw ExceptionFactory.UnableToConnect(muxer, muxer.failureMessage);
}
killMe = null;
return muxer;
}
finally
{
if (connectHandler != null) muxer.ConnectionFailed -= connectHandler;
if (killMe != null) try { killMe.Dispose(); } catch { }
}
killMe = null;
return muxer;
}
finally
{
if (connectHandler != null) muxer.ConnectionFailed -= connectHandler;
if (killMe != null) try { killMe.Dispose(); } catch { }
}
}
......@@ -863,7 +833,57 @@ internal static ConfigurationOptions PrepareConfig(object configuration)
config.SetDefaultPorts();
return config;
}
private static ConnectionMultiplexer CreateMultiplexer(object configuration, TextWriter log, out EventHandler<ConnectionFailedEventArgs> connectHandler)
internal class LogProxy : IDisposable
{
public static LogProxy TryCreate(TextWriter writer)
=> writer == null ? null : new LogProxy(writer);
public override string ToString()
{
string s = null;
if (_log != null)
{
lock(SyncLock)
{
s = _log?.ToString();
}
}
return s ?? base.ToString();
}
private TextWriter _log;
public object SyncLock => this;
private LogProxy(TextWriter log) => _log = log;
public void WriteLine()
{
if (_log != null) // note: double-checked
{
lock (SyncLock)
{
_log?.WriteLine();
}
}
}
public void WriteLine(string message = null)
{
if (_log != null) // note: double-checked
{
lock (SyncLock)
{
_log?.WriteLine(message);
}
}
}
public void Dispose()
{
if (_log != null) // note: double-checked
{
lock (SyncLock) { _log = null; }
}
}
}
private static ConnectionMultiplexer CreateMultiplexer(object configuration, LogProxy log, out EventHandler<ConnectionFailedEventArgs> connectHandler)
{
var muxer = new ConnectionMultiplexer(PrepareConfig(configuration));
connectHandler = null;
......@@ -874,14 +894,13 @@ private static ConnectionMultiplexer CreateMultiplexer(object configuration, Tex
{
try
{
lock (muxer.LogSyncLock) // keep the outer and any inner errors contiguous
lock (log.SyncLock) // keep the outer and any inner errors contiguous
{
var ex = a.Exception;
log.WriteLine($"connection failed: {Format.ToString(a.EndPoint)} ({a.ConnectionType}, {a.FailureType}): {ex?.Message ?? "(unknown)"}");
log?.WriteLine($"connection failed: {Format.ToString(a.EndPoint)} ({a.ConnectionType}, {a.FailureType}): {ex?.Message ?? "(unknown)"}");
while ((ex = ex.InnerException) != null)
{
log.Write("> ");
log.WriteLine(ex.Message);
log?.WriteLine($"> {ex.Message}");
}
}
}
......@@ -919,33 +938,36 @@ private static ConnectionMultiplexer ConnectImpl(object configuration, TextWrite
IDisposable killMe = null;
EventHandler<ConnectionFailedEventArgs> connectHandler = null;
ConnectionMultiplexer muxer = null;
try
using (var logProxy = LogProxy.TryCreate(log))
{
muxer = CreateMultiplexer(configuration, log, out connectHandler);
killMe = muxer;
// note that task has timeouts internally, so it might take *just over* the regular timeout
var task = muxer.ReconfigureAsync(true, false, log, null, "connect");
if (!task.Wait(muxer.SyncConnectTimeout(true)))
try
{
task.ObserveErrors();
if (muxer.RawConfig.AbortOnConnectFail)
{
throw ExceptionFactory.UnableToConnect(muxer, "ConnectTimeout");
}
else
muxer = CreateMultiplexer(configuration, logProxy, out connectHandler);
killMe = muxer;
// note that task has timeouts internally, so it might take *just over* the regular timeout
var task = muxer.ReconfigureAsync(true, false, logProxy, null, "connect");
if (!task.Wait(muxer.SyncConnectTimeout(true)))
{
muxer.LastException = ExceptionFactory.UnableToConnect(muxer, "ConnectTimeout");
task.ObserveErrors();
if (muxer.RawConfig.AbortOnConnectFail)
{
throw ExceptionFactory.UnableToConnect(muxer, "ConnectTimeout");
}
else
{
muxer.LastException = ExceptionFactory.UnableToConnect(muxer, "ConnectTimeout");
}
}
if (!task.Result) throw ExceptionFactory.UnableToConnect(muxer, muxer.failureMessage);
killMe = null;
return muxer;
}
finally
{
if (connectHandler != null) muxer.ConnectionFailed -= connectHandler;
if (killMe != null) try { killMe.Dispose(); } catch { }
}
if (!task.Result) throw ExceptionFactory.UnableToConnect(muxer, muxer.failureMessage);
killMe = null;
return muxer;
}
finally
{
if (connectHandler != null) muxer.ConnectionFailed -= connectHandler;
if (killMe != null) try { killMe.Dispose(); } catch { }
}
}
......@@ -1000,7 +1022,7 @@ internal EndPoint[] GetEndPoints()
return arr;
}
}
internal ServerEndPoint GetServerEndPoint(EndPoint endpoint, TextWriter log = null, bool activate = true)
internal ServerEndPoint GetServerEndPoint(EndPoint endpoint, LogProxy log = null, bool activate = true)
{
if (endpoint == null) return null;
var server = (ServerEndPoint)servers[endpoint];
......@@ -1292,9 +1314,12 @@ internal bool ReconfigureIfNeeded(EndPoint blame, bool fromBroadcast, string cau
/// Reconfigure the current connections based on the existing configuration
/// </summary>
/// <param name="log">The <see cref="TextWriter"/> to log to.</param>
public Task<bool> ConfigureAsync(TextWriter log = null)
public async Task<bool> ConfigureAsync(TextWriter log = null)
{
return ReconfigureAsync(false, true, log, null, "configure").ObserveErrors();
using (var logProxy = LogProxy.TryCreate(log))
{
return await ReconfigureAsync(false, true, logProxy, null, "configure").ObserveErrors();
}
}
/// <summary>
......@@ -1305,21 +1330,24 @@ public bool Configure(TextWriter log = null)
{
// note we expect ReconfigureAsync to internally allow [n] duration,
// so to avoid near misses, here we wait 2*[n]
var task = ReconfigureAsync(false, true, log, null, "configure");
if (!task.Wait(SyncConnectTimeout(false)))
using (var logProxy = LogProxy.TryCreate(log))
{
task.ObserveErrors();
if (RawConfig.AbortOnConnectFail)
var task = ReconfigureAsync(false, true, logProxy, null, "configure");
if (!task.Wait(SyncConnectTimeout(false)))
{
throw new TimeoutException();
}
else
{
LastException = new TimeoutException("ConnectTimeout");
task.ObserveErrors();
if (RawConfig.AbortOnConnectFail)
{
throw new TimeoutException();
}
else
{
LastException = new TimeoutException("ConnectTimeout");
}
return false;
}
return false;
return task.Result;
}
return task.Result;
}
internal int SyncConnectTimeout(bool forConnect)
......@@ -1352,22 +1380,27 @@ public string GetStatus()
/// </summary>
/// <param name="log">The <see cref="TextWriter"/> to log to.</param>
public void GetStatus(TextWriter log)
{
using (var proxy = LogProxy.TryCreate(log))
{
GetStatus(proxy);
}
}
internal void GetStatus(LogProxy log)
{
if (log == null) return;
var tmp = GetServerSnapshot();
foreach (var server in tmp)
{
LogLocked(log, server.Summary());
LogLocked(log, server.GetCounters().ToString());
LogLocked(log, server.GetProfile());
log?.WriteLine(server.Summary());
log?.WriteLine(server.GetCounters().ToString());
log?.WriteLine(server.GetProfile());
}
LogLocked(log, "Sync timeouts: {0}; async timeouts: {1}; fire and forget: {2}; last heartbeat: {3}s ago",
Interlocked.Read(ref syncTimeouts), Interlocked.Read(ref asyncTimeouts),
Interlocked.Read(ref fireAndForgets), LastHeartbeatSecondsAgo);
log?.WriteLine($"Sync timeouts: {Interlocked.Read(ref syncTimeouts)}; async timeouts: {Interlocked.Read(ref asyncTimeouts)}; fire and forget: {Interlocked.Read(ref fireAndForgets)}; last heartbeat: {LastHeartbeatSecondsAgo}s ago");
}
private void ActivateAllServers(TextWriter log)
private void ActivateAllServers(LogProxy log)
{
foreach (var server in GetServerSnapshot())
{
......@@ -1378,16 +1411,11 @@ private void ActivateAllServers(TextWriter log)
}
}
}
internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, TextWriter log, EndPoint blame, string cause, bool publishReconfigure = false, CommandFlags publishReconfigureFlags = CommandFlags.None)
internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogProxy log, EndPoint blame, string cause, bool publishReconfigure = false, CommandFlags publishReconfigureFlags = CommandFlags.None)
{
if (_isDisposed) throw new ObjectDisposedException(ToString());
bool showStats = true;
bool showStats = log is object;
if (log == null)
{
log = TextWriter.Null;
showStats = false;
}
bool ranThisCall = false;
try
{ // note that "activeReconfigs" starts at one; we don't need to set it the first time
......@@ -1395,14 +1423,14 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
if (!ranThisCall)
{
LogLocked(log, "Reconfiguration was already in progress due to: " + activeConfigCause + ", attempted to run for: " + cause);
log?.WriteLine($"Reconfiguration was already in progress due to: {activeConfigCause}, attempted to run for: {cause}");
return false;
}
Trace("Starting reconfiguration...");
Trace(blame != null, "Blaming: " + Format.ToString(blame));
LogLocked(log, RawConfig.ToString(includePassword: false));
LogLocked(log, "");
log?.WriteLine(RawConfig.ToString(includePassword: false));
log?.WriteLine();
if (first)
{
......@@ -1431,7 +1459,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
}
int standaloneCount = 0, clusterCount = 0, sentinelCount = 0;
var endpoints = RawConfig.EndPoints;
LogLocked(log, "{0} unique nodes specified", endpoints.Count);
log?.WriteLine($"{endpoints.Count} unique nodes specified");
if (endpoints.Count == 0)
{
......@@ -1473,7 +1501,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
servers[i] = server;
if (reconfigureAll && server.IsConnected)
{
LogLocked(log, "Refreshing {0}...", Format.ToString(server.EndPoint));
log?.WriteLine($"Refreshing {Format.ToString(server.EndPoint)}...");
// note that these will be processed synchronously *BEFORE* the tracer is processed,
// so we know that the configuration will be up to date if we see the tracer
server.AutoConfigure(null);
......@@ -1481,7 +1509,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
available[i] = server.SendTracer(log);
if (useTieBreakers)
{
LogLocked(log, "Requesting tie-break from {0} > {1}...", Format.ToString(server.EndPoint), RawConfig.TieBreaker);
log?.WriteLine($"Requesting tie-break from {Format.ToString(server.EndPoint)} > {RawConfig.TieBreaker}...");
Message msg = Message.Create(0, flags, RedisCommand.GET, tieBreakerKey);
msg.SetInternalCall();
msg = LoggingMessage.Create(log, msg);
......@@ -1491,7 +1519,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
watch = watch ?? Stopwatch.StartNew();
var remaining = RawConfig.ConnectTimeout - checked((int)watch.ElapsedMilliseconds);
LogLocked(log, "Allowing endpoints {0} to respond...", TimeSpan.FromMilliseconds(remaining));
log?.WriteLine($"Allowing endpoints {TimeSpan.FromMilliseconds(remaining)} to respond...");
Trace("Allowing endpoints " + TimeSpan.FromMilliseconds(remaining) + " to respond...");
await WaitAllIgnoreErrorsAsync(available, remaining, log).ForAwait();
......@@ -1506,14 +1534,14 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
var aex = task.Exception;
foreach (var ex in aex.InnerExceptions)
{
LogLocked(log, "{0} faulted: {1}", Format.ToString(endpoints[i]), ex.Message);
log?.WriteLine($"{Format.ToString(endpoints[i])} faulted: {ex.Message}");
failureMessage = ex.Message;
}
}
else if (task.IsCanceled)
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} was canceled", Format.ToString(endpoints[i]));
log?.WriteLine($"{Format.ToString(endpoints[i])} was canceled");
}
else if (task.IsCompleted)
{
......@@ -1521,7 +1549,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
if (task.Result)
{
servers[i].ClearUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} returned with success", Format.ToString(endpoints[i]));
log?.WriteLine($"{Format.ToString(endpoints[i])} returned with success");
// count the server types
switch (server.ServerType)
......@@ -1572,13 +1600,13 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
else
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} returned, but incorrectly", Format.ToString(endpoints[i]));
log?.WriteLine($"{Format.ToString(endpoints[i])} returned, but incorrectly");
}
}
else
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
LogLocked(log, "{0} did not respond", Format.ToString(endpoints[i]));
log?.WriteLine($"{Format.ToString(endpoints[i])} did not respond");
}
}
......@@ -1624,19 +1652,18 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
{
ServerSelectionStrategy.ServerType = ServerType.Cluster;
long coveredSlots = ServerSelectionStrategy.CountCoveredSlots();
LogLocked(log, "Cluster: {0} of {1} slots covered",
coveredSlots, ServerSelectionStrategy.TotalSlots);
log?.WriteLine($"Cluster: {coveredSlots} of {ServerSelectionStrategy.TotalSlots} slots covered");
}
if (!first)
{
long subscriptionChanges = ValidateSubscriptions();
if (subscriptionChanges == 0)
{
LogLocked(log, "No subscription changes necessary");
log?.WriteLine("No subscription changes necessary");
}
else
{
LogLocked(log, "Subscriptions reconfigured: {0}", subscriptionChanges);
log?.WriteLine($"Subscriptions reconfigured: {subscriptionChanges}");
}
}
if (showStats)
......@@ -1647,15 +1674,15 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
string stormLog = GetStormLog();
if (!string.IsNullOrWhiteSpace(stormLog))
{
LogLocked(log, "");
LogLocked(log, stormLog);
log?.WriteLine();
log?.WriteLine(stormLog);
}
healthy = standaloneCount != 0 || clusterCount != 0 || sentinelCount != 0;
if (first && !healthy && attemptsLeft > 0)
{
LogLocked(log, "resetting failing connections to retry...");
log?.WriteLine("resetting failing connections to retry...");
ResetAllNonConnected();
LogLocked(log, "retrying; attempts left: " + attemptsLeft + "...");
log?.WriteLine($"retrying; attempts left: {attemptsLeft}...");
}
//WTF("?: " + attempts);
} while (first && !healthy && attemptsLeft > 0);
......@@ -1666,14 +1693,14 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
}
if (first)
{
LogLocked(log, "Starting heartbeat...");
log?.WriteLine("Starting heartbeat...");
pulse = TimerToken.Create(this);
}
if (publishReconfigure)
{
try
{
LogLocked(log, "Broadcasting reconfigure...");
log?.WriteLine("Broadcasting reconfigure...");
PublishReconfigureImpl(publishReconfigureFlags);
}
catch
......@@ -1696,7 +1723,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
}
}
private async Task<EndPointCollection> GetEndpointsFromClusterNodes(ServerEndPoint server, TextWriter log)
private async Task<EndPointCollection> GetEndpointsFromClusterNodes(ServerEndPoint server, LogProxy log)
{
var message = Message.Create(-1, CommandFlags.None, RedisCommand.CLUSTER, RedisLiterals.NODES);
try
......@@ -1706,7 +1733,7 @@ private async Task<EndPointCollection> GetEndpointsFromClusterNodes(ServerEndPoi
}
catch (Exception ex)
{
LogLocked(log, "Encountered error while updating cluster config: " + ex.Message);
log?.WriteLine($"Encountered error while updating cluster config: {ex.Message}");
return null;
}
}
......@@ -1721,16 +1748,16 @@ private void ResetAllNonConnected()
}
#pragma warning disable IDE0060
partial void OnTraceLog(TextWriter log, [CallerMemberName] string caller = null);
partial void OnTraceLog(LogProxy log, [CallerMemberName] string caller = null);
#pragma warning restore IDE0060
private async Task<ServerEndPoint> NominatePreferredMaster(TextWriter log, ServerEndPoint[] servers, bool useTieBreakers, Task<string>[] tieBreakers, List<ServerEndPoint> masters)
private async Task<ServerEndPoint> NominatePreferredMaster(LogProxy log, ServerEndPoint[] servers, bool useTieBreakers, Task<string>[] tieBreakers, List<ServerEndPoint> masters)
{
Dictionary<string, int> uniques = null;
if (useTieBreakers)
{ // count the votes
uniques = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);
LogLocked(log, "Waiting for tiebreakers...");
log?.WriteLine("Waiting for tiebreakers...");
await WaitAllIgnoreErrorsAsync(tieBreakers, 50, log).ForAwait();
for (int i = 0; i < tieBreakers.Length; i++)
{
......@@ -1742,25 +1769,25 @@ private async Task<ServerEndPoint> NominatePreferredMaster(TextWriter log, Serve
string s = tieBreakers[i].Result;
if (string.IsNullOrWhiteSpace(s))
{
LogLocked(log, "{0} had no tiebreaker set", Format.ToString(ep));
log?.WriteLine($"{Format.ToString(ep)} had no tiebreaker set");
}
else
{
LogLocked(log, "{0} nominates: {1}", Format.ToString(ep), s);
log?.WriteLine($"{Format.ToString(ep)} nominates: {s}");
if (!uniques.TryGetValue(s, out int count)) count = 0;
uniques[s] = count + 1;
}
break;
case TaskStatus.Faulted:
LogLocked(log, "{0} failed to nominate ({1})", Format.ToString(ep), status);
log?.WriteLine($"{Format.ToString(ep)} failed to nominate ({status})");
foreach (var ex in tieBreakers[i].Exception.InnerExceptions)
{
if (ex.Message.StartsWith("MOVED ") || ex.Message.StartsWith("ASK ")) continue;
LogLocked(log, "> {0}", ex.Message);
log?.WriteLine("> " + ex.Message);
}
break;
default:
LogLocked(log, "{0} failed to nominate ({1})", Format.ToString(ep), status);
log?.WriteLine($"{Format.ToString(ep)} failed to nominate ({status})");
break;
}
}
......@@ -1769,37 +1796,37 @@ private async Task<ServerEndPoint> NominatePreferredMaster(TextWriter log, Serve
switch (masters.Count)
{
case 0:
LogLocked(log, "No masters detected");
log?.WriteLine("No masters detected");
return null;
case 1:
LogLocked(log, "Single master detected: " + Format.ToString(masters[0].EndPoint));
log?.WriteLine($"Single master detected: {Format.ToString(masters[0].EndPoint)}");
return masters[0];
default:
LogLocked(log, "Multiple masters detected...");
log?.WriteLine("Multiple masters detected...");
if (useTieBreakers && uniques != null)
{
switch (uniques.Count)
{
case 0:
LogLocked(log, "nobody nominated a tie-breaker");
log?.WriteLine("nobody nominated a tie-breaker");
break;
case 1:
string unanimous = uniques.Keys.Single();
LogLocked(log, "tie-break is unanimous at {0}", unanimous);
log?.WriteLine($"tie-break is unanimous at {unanimous}");
var found = SelectServerByElection(servers, unanimous, log);
if (found != null)
{
LogLocked(log, "Elected: {0}", Format.ToString(found.EndPoint));
log?.WriteLine($"Elected: {Format.ToString(found.EndPoint)}");
return found;
}
break;
default:
LogLocked(log, "tie-break is contested:");
log?.WriteLine("tie-break is contested:");
ServerEndPoint highest = null;
bool arbitrary = false;
foreach (var pair in uniques.OrderByDescending(x => x.Value))
{
LogLocked(log, "{0} has {1} votes", pair.Key, pair.Value);
log?.WriteLine($"{pair.Key} has {pair.Value} votes");
if (highest == null)
{
highest = SelectServerByElection(servers, pair.Key, log);
......@@ -1814,11 +1841,11 @@ private async Task<ServerEndPoint> NominatePreferredMaster(TextWriter log, Serve
{
if (arbitrary)
{
LogLocked(log, "Choosing master arbitrarily: {0}", Format.ToString(highest.EndPoint));
log?.WriteLine($"Choosing master arbitrarily: {Format.ToString(highest.EndPoint)}");
}
else
{
LogLocked(log, "Elected: {0}", Format.ToString(highest.EndPoint));
log?.WriteLine($"Elected: {Format.ToString(highest.EndPoint)}");
}
return highest;
}
......@@ -1828,11 +1855,11 @@ private async Task<ServerEndPoint> NominatePreferredMaster(TextWriter log, Serve
break;
}
LogLocked(log, "Choosing master arbitrarily: {0}", Format.ToString(masters[0].EndPoint));
log?.WriteLine($"Choosing master arbitrarily: {Format.ToString(masters[0].EndPoint)}");
return masters[0];
}
private ServerEndPoint SelectServerByElection(ServerEndPoint[] servers, string endpoint, TextWriter log)
private ServerEndPoint SelectServerByElection(ServerEndPoint[] servers, string endpoint, LogProxy log)
{
if (servers == null || string.IsNullOrWhiteSpace(endpoint)) return null;
for (int i = 0; i < servers.Length; i++)
......@@ -1840,13 +1867,13 @@ private ServerEndPoint SelectServerByElection(ServerEndPoint[] servers, string e
if (string.Equals(Format.ToString(servers[i].EndPoint), endpoint, StringComparison.OrdinalIgnoreCase))
return servers[i];
}
LogLocked(log, "...but we couldn't find that");
log?.WriteLine("...but we couldn't find that");
var deDottedEndpoint = DeDotifyHost(endpoint);
for (int i = 0; i < servers.Length; i++)
{
if (string.Equals(DeDotifyHost(Format.ToString(servers[i].EndPoint)), deDottedEndpoint, StringComparison.OrdinalIgnoreCase))
{
LogLocked(log, "...but we did find instead: {0}", deDottedEndpoint);
log?.WriteLine($"...but we did find instead: {deDottedEndpoint}");
return servers[i];
}
}
......
......@@ -20,7 +20,7 @@ partial class ConnectionMultiplexer
Debug.WriteLine(message, Environment.CurrentManagedThreadId + " ~ " + category);
}
partial void OnTraceLog(TextWriter log, string caller)
partial void OnTraceLog(LogProxy log, string caller)
{
lock (UniqueId)
{
......
......@@ -8,20 +8,21 @@
using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis.Profiling;
using static StackExchange.Redis.ConnectionMultiplexer;
namespace StackExchange.Redis
{
internal sealed class LoggingMessage : Message
{
public readonly TextWriter log;
public readonly LogProxy log;
private readonly Message tail;
public static Message Create(TextWriter log, Message tail)
public static Message Create(LogProxy log, Message tail)
{
return log == null ? tail : new LoggingMessage(log, tail);
}
private LoggingMessage(TextWriter log, Message tail) : base(tail.Db, tail.Flags, tail.Command)
private LoggingMessage(LogProxy log, Message tail) : base(tail.Db, tail.Flags, tail.Command)
{
this.log = log;
this.tail = tail;
......@@ -39,14 +40,14 @@ protected override void WriteImpl(PhysicalConnection physical)
try
{
var bridge = physical.BridgeCouldBeNull;
bridge?.Multiplexer?.LogLocked(log, "Writing to {0}: {1}", bridge, tail.CommandAndKey);
log?.WriteLine($"Writing to {bridge}: {tail.CommandAndKey}");
}
catch { }
tail.WriteTo(physical);
}
public override int ArgCount => tail.ArgCount;
public TextWriter Log => log;
public LogProxy Log => log;
}
internal abstract class Message : ICompletable
......
......@@ -10,6 +10,7 @@
using Pipelines.Sockets.Unofficial;
using Pipelines.Sockets.Unofficial.Threading;
using static Pipelines.Sockets.Unofficial.Threading.MutexSlim;
using static StackExchange.Redis.ConnectionMultiplexer;
using PendingSubscriptionState = global::StackExchange.Redis.ConnectionMultiplexer.Subscription.PendingSubscriptionState;
namespace StackExchange.Redis
......@@ -129,7 +130,7 @@ public void ReportNextFailure()
public override string ToString() => ConnectionType + "/" + Format.ToString(ServerEndPoint.EndPoint);
public void TryConnect(TextWriter log) => GetConnection(log);
public void TryConnect(LogProxy log) => GetConnection(log);
private WriteResult QueueOrFailMessage(Message message)
{
......@@ -380,7 +381,7 @@ internal void KeepAlive()
}
}
internal async Task OnConnectedAsync(PhysicalConnection connection, TextWriter log)
internal async Task OnConnectedAsync(PhysicalConnection connection, LogProxy log)
{
Trace("OnConnected");
if (physical == connection && !isDisposed && ChangeState(State.Connecting, State.ConnectedEstablishing))
......@@ -1097,7 +1098,7 @@ private bool ChangeState(State oldState, State newState)
return result;
}
private PhysicalConnection GetConnection(TextWriter log)
private PhysicalConnection GetConnection(LogProxy log)
{
if (state == (int)State.Disconnected)
{
......@@ -1105,7 +1106,7 @@ private PhysicalConnection GetConnection(TextWriter log)
{
if (!Multiplexer.IsDisposed)
{
Multiplexer.LogLocked(log, "Connecting {0}...", Name);
log?.WriteLine($"Connecting {Name}...");
Multiplexer.Trace("Connecting...", Name);
if (ChangeState(State.Disconnected, State.Connecting))
{
......@@ -1122,7 +1123,7 @@ private PhysicalConnection GetConnection(TextWriter log)
}
catch (Exception ex)
{
Multiplexer.LogLocked(log, "Connect {0} failed: {1}", Name, ex.Message);
log?.WriteLine($"Connect {Name} failed: {ex.Message}");
Multiplexer.Trace("Connect failed: " + ex.Message, Name);
ChangeState(State.Disconnected);
OnInternalError(ex);
......
......@@ -18,6 +18,7 @@
using System.Threading.Tasks;
using Pipelines.Sockets.Unofficial;
using Pipelines.Sockets.Unofficial.Arenas;
using static StackExchange.Redis.ConnectionMultiplexer;
namespace StackExchange.Redis
{
......@@ -85,7 +86,7 @@ public PhysicalConnection(PhysicalBridge bridge)
OnCreateEcho();
}
internal async Task BeginConnectAsync(TextWriter log)
internal async Task BeginConnectAsync(LogProxy log)
{
var bridge = BridgeCouldBeNull;
var endpoint = bridge?.ServerEndPoint?.EndPoint;
......@@ -97,7 +98,7 @@ internal async Task BeginConnectAsync(TextWriter log)
Trace("Connecting...");
_socket = SocketManager.CreateSocket(endpoint);
bridge.Multiplexer.OnConnecting(endpoint, bridge.ConnectionType);
bridge.Multiplexer.LogLocked(log, "BeginConnect: {0}", Format.ToString(endpoint));
log?.WriteLine($"BeginConnect: {Format.ToString(endpoint)}");
CancellationTokenSource timeoutSource = null;
try
......@@ -141,7 +142,7 @@ internal async Task BeginConnectAsync(TextWriter log)
}
else if (await ConnectedAsync(x, log, bridge.Multiplexer.SocketManager).ForAwait())
{
bridge.Multiplexer.LogLocked(log, "Starting read");
log?.WriteLine("Starting read");
try
{
StartReading();
......@@ -161,7 +162,7 @@ internal async Task BeginConnectAsync(TextWriter log)
}
catch (ObjectDisposedException)
{
bridge.Multiplexer.LogLocked(log, "(socket shutdown)");
log?.WriteLine("(socket shutdown)");
try { RecordConnectionFailed(ConnectionFailureType.UnableToConnect, isInitialConnect: true); }
catch (Exception inner)
{
......@@ -1251,7 +1252,7 @@ private static LocalCertificateSelectionCallback GetAmbientClientCertificateCall
return null;
}
internal async ValueTask<bool> ConnectedAsync(Socket socket, TextWriter log, SocketManager manager)
internal async ValueTask<bool> ConnectedAsync(Socket socket, LogProxy log, SocketManager manager)
{
var bridge = BridgeCouldBeNull;
if (bridge == null) return false;
......@@ -1270,7 +1271,7 @@ internal async ValueTask<bool> ConnectedAsync(Socket socket, TextWriter log, Soc
if (config.Ssl)
{
bridge.Multiplexer.LogLocked(log, "Configuring SSL");
log?.WriteLine("Configuring TLS");
var host = config.SslHost;
if (string.IsNullOrWhiteSpace(host)) host = Format.ToStringHostOnly(bridge.ServerEndPoint.EndPoint);
......@@ -1290,7 +1291,7 @@ internal async ValueTask<bool> ConnectedAsync(Socket socket, TextWriter log, Soc
bridge.Multiplexer?.SetAuthSuspect();
throw;
}
bridge.Multiplexer.LogLocked(log, $"SSL connection established successfully using protocol: {ssl.SslProtocol}");
log?.WriteLine($"TLS connection established successfully using protocol: {ssl.SslProtocol}");
}
catch (AuthenticationException authexception)
{
......@@ -1308,7 +1309,7 @@ internal async ValueTask<bool> ConnectedAsync(Socket socket, TextWriter log, Soc
_ioPipe = pipe;
bridge.Multiplexer.LogLocked(log, "Connected {0}", bridge);
log?.WriteLine($"Connected {bridge}");
await bridge.OnConnectedAsync(this, log).ForAwait();
return true;
......
......@@ -6,6 +6,7 @@
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using static StackExchange.Redis.ConnectionMultiplexer;
#pragma warning disable RCS1231 // Make parameter ref read-only.
......@@ -320,7 +321,10 @@ public Task<DateTime> LastSaveAsync(CommandFlags flags = CommandFlags.None)
public void MakeMaster(ReplicationChangeOptions options, TextWriter log = null)
{
multiplexer.MakeMaster(server, options, log);
using (var proxy = LogProxy.TryCreate(log))
{
multiplexer.MakeMaster(server, options, proxy);
}
}
public void Save(SaveType type, CommandFlags flags = CommandFlags.None)
......
......@@ -175,7 +175,7 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, in
{
try
{
bridge?.Multiplexer?.LogLocked(logging.Log, "Response from {0} / {1}: {2}", bridge, message.CommandAndKey, result);
logging.Log?.WriteLine($"Response from {bridge} / {message.CommandAndKey}: {result}");
}
catch { }
}
......
......@@ -9,6 +9,7 @@
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using static StackExchange.Redis.ConnectionMultiplexer;
using static StackExchange.Redis.PhysicalBridge;
namespace StackExchange.Redis
......@@ -155,7 +156,7 @@ public void Dispose()
tmp?.Dispose();
}
public PhysicalBridge GetBridge(ConnectionType type, bool create = true, TextWriter log = null)
public PhysicalBridge GetBridge(ConnectionType type, bool create = true, LogProxy log = null)
{
if (isDisposed) return null;
switch (type)
......@@ -237,7 +238,7 @@ public void SetUnselectable(UnselectableFlags flags)
public ValueTask<WriteResult> TryWriteAsync(Message message) => GetBridge(message.Command)?.TryWriteAsync(message, isSlave) ?? new ValueTask<WriteResult>(WriteResult.NoConnectionAvailable);
internal void Activate(ConnectionType type, TextWriter log)
internal void Activate(ConnectionType type, LogProxy log)
{
GetBridge(type, true, log);
}
......@@ -467,7 +468,7 @@ internal bool IsSelectable(RedisCommand command, bool allowDisconnected = false)
return bridge != null && (allowDisconnected || bridge.IsConnected);
}
internal Task OnEstablishingAsync(PhysicalConnection connection, TextWriter log)
internal Task OnEstablishingAsync(PhysicalConnection connection, LogProxy log)
{
try
{
......@@ -624,7 +625,7 @@ internal void ReportNextFailure()
subscription?.ReportNextFailure();
}
internal Task<bool> SendTracer(TextWriter log = null)
internal Task<bool> SendTracer(LogProxy log = null)
{
var msg = GetTracerMessage(false);
msg = LoggingMessage.Create(log, msg);
......@@ -727,7 +728,7 @@ internal void WriteDirectOrQueueFireAndForgetSync<T>(PhysicalConnection connecti
}
}
private PhysicalBridge CreateBridge(ConnectionType type, TextWriter log)
private PhysicalBridge CreateBridge(ConnectionType type, LogProxy log)
{
if (Multiplexer.IsDisposed) return null;
Multiplexer.Trace(type.ToString());
......@@ -736,9 +737,9 @@ private PhysicalBridge CreateBridge(ConnectionType type, TextWriter log)
return bridge;
}
private async Task HandshakeAsync(PhysicalConnection connection, TextWriter log)
private async Task HandshakeAsync(PhysicalConnection connection, LogProxy log)
{
Multiplexer.LogLocked(log, "Server handshake");
log?.WriteLine("Server handshake");
if (connection == null)
{
Multiplexer.Trace("No connection!?");
......@@ -748,7 +749,7 @@ private async Task HandshakeAsync(PhysicalConnection connection, TextWriter log)
string password = Multiplexer.RawConfig.Password;
if (!string.IsNullOrWhiteSpace(password))
{
Multiplexer.LogLocked(log, "Authenticating (password)");
log?.WriteLine("Authenticating (password)");
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.AUTH, (RedisValue)password);
msg.SetInternalCall();
await WriteDirectOrQueueFireAndForgetAsync(connection, msg, ResultProcessor.DemandOK).ForAwait();
......@@ -762,7 +763,7 @@ private async Task HandshakeAsync(PhysicalConnection connection, TextWriter log)
name = nameSanitizer.Replace(name, "");
if (!string.IsNullOrWhiteSpace(name))
{
Multiplexer.LogLocked(log, "Setting client name: {0}", name);
log?.WriteLine($"Setting client name: {name}");
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.CLIENT, RedisLiterals.SETNAME, (RedisValue)name);
msg.SetInternalCall();
await WriteDirectOrQueueFireAndForgetAsync(connection, msg, ResultProcessor.DemandOK).ForAwait();
......@@ -779,10 +780,10 @@ private async Task HandshakeAsync(PhysicalConnection connection, TextWriter log)
if (connType == ConnectionType.Interactive)
{
Multiplexer.LogLocked(log, "Auto-configure...");
log?.WriteLine("Auto-configure...");
AutoConfigure(connection);
}
Multiplexer.LogLocked(log, "Sending critical tracer: {0}", bridge);
log?.WriteLine($"Sending critical tracer: {bridge}");
var tracer = GetTracerMessage(true);
tracer = LoggingMessage.Create(log, tracer);
await WriteDirectOrQueueFireAndForgetAsync(connection, tracer, ResultProcessor.EstablishConnection).ForAwait();
......@@ -798,7 +799,7 @@ private async Task HandshakeAsync(PhysicalConnection connection, TextWriter log)
await WriteDirectOrQueueFireAndForgetAsync(connection, msg, ResultProcessor.TrackSubscriptions).ForAwait();
}
}
Multiplexer.LogLocked(log, "Flushing outbound buffer");
log?.WriteLine("Flushing outbound buffer");
await connection.FlushAsync().ForAwait();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment