Commit 3c6e6821 authored by Marc Gravell's avatar Marc Gravell

tell us what heartbeats happen, before they happen

parent 409edc0a
...@@ -276,6 +276,7 @@ protected IServer GetAnyMaster(ConnectionMultiplexer muxer) ...@@ -276,6 +276,7 @@ protected IServer GetAnyMaster(ConnectionMultiplexer muxer)
}; };
muxer.Connecting += (e, t) => Writer.WriteLine($"Connecting to {Format.ToString(e)} as {t}"); muxer.Connecting += (e, t) => Writer.WriteLine($"Connecting to {Format.ToString(e)} as {t}");
muxer.TransactionLog += msg => { lock (Writer) { Writer.WriteLine("tran: " + msg); } }; muxer.TransactionLog += msg => { lock (Writer) { Writer.WriteLine("tran: " + msg); } };
muxer.Heartbeat += msg => Writer.WriteLine($"{Time()}: {msg}");
muxer.Resurrecting += (e, t) => Writer.WriteLine($"Resurrecting {Format.ToString(e)} as {t}"); muxer.Resurrecting += (e, t) => Writer.WriteLine($"Resurrecting {Format.ToString(e)} as {t}");
muxer.Closing += complete => Writer.WriteLine(complete ? "Closed" : "Closing..."); muxer.Closing += complete => Writer.WriteLine(complete ? "Closed" : "Closing...");
return muxer; return muxer;
......
...@@ -22,7 +22,7 @@ public async Task Exec() ...@@ -22,7 +22,7 @@ public async Task Exec()
await Task.Delay(7000).ForAwait(); await Task.Delay(7000).ForAwait();
var after = conn.GetCounters(); var after = conn.GetCounters();
int done = (int)(after.Interactive.CompletedSynchronously - before.Interactive.CompletedSynchronously); int done = (int)(after.Interactive.CompletedSynchronously - before.Interactive.CompletedSynchronously);
Assert.True(done >= 2); Assert.True(done >= 2, $"expected >=2, got {done}");
} }
} }
} }
......
...@@ -237,6 +237,7 @@ internal void KeepAlive() ...@@ -237,6 +237,7 @@ internal void KeepAlive()
{ {
var commandMap = Multiplexer.CommandMap; var commandMap = Multiplexer.CommandMap;
Message msg = null; Message msg = null;
var features = ServerEndPoint.GetFeatures();
switch (ConnectionType) switch (ConnectionType)
{ {
case ConnectionType.Interactive: case ConnectionType.Interactive:
...@@ -244,7 +245,7 @@ internal void KeepAlive() ...@@ -244,7 +245,7 @@ internal void KeepAlive()
msg.SetSource(ResultProcessor.Tracer, null); msg.SetSource(ResultProcessor.Tracer, null);
break; break;
case ConnectionType.Subscription: case ConnectionType.Subscription:
if (commandMap.IsAvailable(RedisCommand.PING) && ServerEndPoint.GetFeatures().PingOnSubscriber) if (commandMap.IsAvailable(RedisCommand.PING) && features.PingOnSubscriber)
{ {
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.PING); msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.PING);
msg.SetSource(ResultProcessor.Tracer, null); msg.SetSource(ResultProcessor.Tracer, null);
...@@ -261,6 +262,7 @@ internal void KeepAlive() ...@@ -261,6 +262,7 @@ internal void KeepAlive()
{ {
msg.SetInternalCall(); msg.SetInternalCall();
Multiplexer.Trace("Enqueue: " + msg); Multiplexer.Trace("Enqueue: " + msg);
Multiplexer.OnHeartbeat($"heartbeat '{msg.CommandAndKey}' on '{PhysicalName}' (v{features.Version})");
var result = TryWrite(msg, ServerEndPoint.IsSlave); var result = TryWrite(msg, ServerEndPoint.IsSlave);
if (result != WriteResult.Success) if (result != WriteResult.Success)
......
...@@ -254,7 +254,7 @@ internal string GetConnectionName(EndPoint endPoint, ConnectionType connectionTy ...@@ -254,7 +254,7 @@ internal string GetConnectionName(EndPoint endPoint, ConnectionType connectionTy
internal event Action<string, Exception, string> MessageFaulted; internal event Action<string, Exception, string> MessageFaulted;
internal event Action<bool> Closing; internal event Action<bool> Closing;
internal event Action<string> PreTransactionExec, TransactionLog; internal event Action<string> PreTransactionExec, TransactionLog, Heartbeat;
internal event Action<EndPoint, ConnectionType> Connecting; internal event Action<EndPoint, ConnectionType> Connecting;
internal event Action<EndPoint, ConnectionType> Resurrecting; internal event Action<EndPoint, ConnectionType> Resurrecting;
...@@ -262,6 +262,10 @@ internal void OnMessageFaulted(Message msg, Exception fault, [CallerMemberName] ...@@ -262,6 +262,10 @@ internal void OnMessageFaulted(Message msg, Exception fault, [CallerMemberName]
{ {
MessageFaulted?.Invoke(msg?.CommandAndKey, fault, $"{origin} ({path}#{lineNumber})"); MessageFaulted?.Invoke(msg?.CommandAndKey, fault, $"{origin} ({path}#{lineNumber})");
} }
internal void OnHeartbeat(string message)
{
Heartbeat?.Invoke(message);
}
internal void OnClosing(bool complete) internal void OnClosing(bool complete)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment