Commit 34223d82 authored by Marc Gravell's avatar Marc Gravell

add TEST tracers when connecting and closing; prefer PING on subscribers;...

add TEST tracers when connecting and closing; prefer PING on subscribers; avoid NRE in connect; set no-linger
parent d488e815
......@@ -53,6 +53,9 @@ internal static ConnectionMultiplexer GetWithFT(ITestOutputHelper output)
};
var conn = ConnectionMultiplexer.Connect(options);
conn.MessageFaulted += (msg, ex, origin) => output.WriteLine($"Faulted from '{origin}': '{msg}' - '{(ex == null ? "(null)" : ex.Message)}'");
conn.Connecting += (e, t) => output.WriteLine($"Connecting to {Format.ToString(e)} as {t}");
conn.Closing += complete => output.WriteLine(complete ? "Closed" : "Closing...");
var server = conn.GetServer(ep);
var arr = (RedisResult[])server.Execute("module", "list");
bool found = false;
......
......@@ -260,6 +260,8 @@ protected IServer GetAnyMaster(ConnectionMultiplexer muxer)
muxer.InternalError += OnInternalError;
muxer.ConnectionFailed += OnConnectionFailed;
muxer.MessageFaulted += (msg, ex, origin) => Writer?.WriteLine($"Faulted from '{origin}': '{msg}' - '{(ex == null ? "(null)" : ex.Message)}'");
muxer.Connecting += (e, t) => Writer.WriteLine($"Connecting to {Format.ToString(e)} as {t}");
muxer.Closing += complete => Writer.WriteLine(complete ? "Closed" : "Closing...");
return muxer;
}
......
{
"methodDisplay": "classAndMethod",
"maxParallelThreads": 3,
"maxParallelThreads": 8,
"diagnosticMessages": false,
"longRunningTestSeconds": 60
}
\ No newline at end of file
......@@ -1949,6 +1949,9 @@ public bool IsConnecting
/// <param name="allowCommandsToComplete">Whether to allow all in-queue commands to complete first.</param>
public void Close(bool allowCommandsToComplete = true)
{
if (_isDisposed) return;
OnClosing(false);
_isDisposed = true;
_profilingSessionProvider = null;
using (var tmp = pulse)
......@@ -1963,6 +1966,7 @@ public void Close(bool allowCommandsToComplete = true)
}
DisposeAndClearServers();
OnCloseReaderWriter();
OnClosing(true);
}
partial void OnCloseReaderWriter();
......
......@@ -242,7 +242,12 @@ internal void KeepAlive()
msg.SetSource(ResultProcessor.Tracer, null);
break;
case ConnectionType.Subscription:
if (commandMap.IsAvailable(RedisCommand.UNSUBSCRIBE))
if (commandMap.IsAvailable(RedisCommand.PING) && ServerEndPoint.GetFeatures().PingOnSubscriber)
{
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.PING);
msg.SetSource(ResultProcessor.Tracer, null);
}
else if (commandMap.IsAvailable(RedisCommand.UNSUBSCRIBE))
{
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.UNSUBSCRIBE,
(RedisChannel)Guid.NewGuid().ToByteArray());
......
......@@ -83,7 +83,7 @@ internal async void BeginConnectAsync(TextWriter log)
Trace("Connecting...");
_socket = SocketManager.CreateSocket(endpoint);
bridge.Multiplexer.OnConnecting(endpoint, bridge.ConnectionType);
bridge.Multiplexer.LogLocked(log, "BeginConnect: {0}", Format.ToString(endpoint));
CancellationTokenSource timeoutSource = null;
......@@ -99,7 +99,12 @@ internal async void BeginConnectAsync(TextWriter log)
{
_socketArgs.Completed += SocketAwaitable.Callback;
if (_socket.ConnectAsync(_socketArgs))
var x = _socket;
if (x == null)
{
awaitable.TryComplete(0, SocketError.ConnectionAborted);
}
else if (x.ConnectAsync(_socketArgs))
{ // asynchronous operation is pending
timeoutSource = ConfigureTimeout(_socketArgs, bridge.Multiplexer.RawConfig.ConnectTimeout);
}
......@@ -121,7 +126,12 @@ internal async void BeginConnectAsync(TextWriter log)
timeoutSource.Cancel();
timeoutSource.Dispose();
}
if (await ConnectedAsync(_socket, log, bridge.Multiplexer.SocketManager).ForAwait())
var x = _socket;
if (x == null)
{
ConnectionMultiplexer.TraceWithoutContext("Socket was already aborted");
}
else if (await ConnectedAsync(x, log, bridge.Multiplexer.SocketManager).ForAwait())
{
bridge.Multiplexer.LogLocked(log, "Starting read");
try
......@@ -276,8 +286,15 @@ public Task FlushAsync()
}
public void RecordConnectionFailed(ConnectionFailureType failureType, Exception innerException = null, [CallerMemberName] string origin = null,
bool isInitialConnect = false, IDuplexPipe connectingPipe = null)
bool isInitialConnect = false, IDuplexPipe connectingPipe = null
#if TEST
, [CallerFilePath] string path = default, [CallerLineNumber] int line = default
#endif
)
{
#if TEST
origin += $" ({path}#{line})";
#endif
Exception outerException = innerException;
IdentifyFailureType(innerException, ref failureType);
var bridge = BridgeCouldBeNull;
......
......@@ -61,6 +61,7 @@ public RawResult(RawResult[] itemsOversized, int itemCount)
internal bool IsNull => (_type & NonNullFlag) == 0;
public bool HasValue => Type != ResultType.None;
public override string ToString()
{
if (IsNull) return "(null)";
......
......@@ -156,6 +156,11 @@ public RedisFeatures(Version version)
/// </summary>
public bool Geo => Version >= v3_2_0;
/// <summary>
/// Can PING be used on a subscription connection?
/// </summary>
internal bool PingOnSubscriber => Version >= v3_2_0;
/// <summary>
/// Does SetPop support popping multiple items?
/// </summary>
......
......@@ -251,11 +251,21 @@ internal bool Validate(ConnectionMultiplexer multiplexer, RedisChannel channel)
#if TEST
internal event Action<string, Exception, string> MessageFaulted;
internal event Action<bool> Closing;
internal event Action<EndPoint, ConnectionType> Connecting;
#else
internal event Action<string, Exception, string> MessageFaulted
{ // completely empty shell event, just to keep the test suite compiling
add { } remove { }
}
internal event Action<bool> Closing
{ // completely empty shell event, just to keep the test suite compiling
add { } remove { }
}
internal event Action<EndPoint, ConnectionType> Connecting
{ // completely empty shell event, just to keep the test suite compiling
add { } remove { }
}
#endif
[Conditional("TEST")]
......@@ -263,6 +273,22 @@ internal void OnMessageFaulted(Message msg, Exception fault, [CallerMemberName]
{
#if TEST
MessageFaulted?.Invoke(msg?.CommandAndKey, fault, $"{origin} ({path}#{lineNumber})");
#endif
}
[Conditional("TEST")]
internal void OnClosing(bool complete)
{
#if TEST
Closing?.Invoke(complete);
#endif
}
[Conditional("TEST")]
internal void OnConnecting(EndPoint endpoint, ConnectionType connectionType)
{
#if TEST
Connecting?.Invoke(endpoint, connectionType);
#endif
}
}
......
......@@ -1827,7 +1827,28 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
happy = result.Type == ResultType.BulkString && (!establishConnection || result.IsEqual(connection.BridgeCouldBeNull?.Multiplexer?.UniqueId));
break;
case RedisCommand.PING:
happy = result.Type == ResultType.SimpleString && result.IsEqual(CommonReplies.PONG);
// there are two different PINGs; "interactive" is a +PONG or +{your message},
// but subscriber returns a bulk-array of [ "pong", {your message} ]
switch (result.Type)
{
case ResultType.SimpleString:
happy = result.IsEqual(CommonReplies.PONG);
break;
case ResultType.MultiBulk:
if (result.ItemsCount == 2)
{
var items = result.GetItems();
happy = items[0].IsEqual(CommonReplies.PONG) && items[1].Payload.IsEmpty;
}
else
{
happy = false;
}
break;
default:
happy = false;
break;
}
break;
case RedisCommand.TIME:
happy = result.Type == ResultType.MultiBulk && result.GetItems().Length == 2;
......@@ -1836,7 +1857,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
happy = result.Type == ResultType.Integer;
break;
default:
happy = true;
happy = false;
break;
}
if (happy)
......@@ -1847,7 +1868,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
}
else
{
connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure);
connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure,
new InvalidOperationException($"unexpected tracer reply to {message.Command}: {result.ToString()}"));
return false;
}
}
......
......@@ -182,10 +182,7 @@ public PhysicalBridge GetBridge(RedisCommand command, bool create = true)
}
}
public RedisFeatures GetFeatures()
{
return new RedisFeatures(version);
}
public RedisFeatures GetFeatures() => new RedisFeatures(version);
public void SetClusterConfiguration(ClusterConfiguration configuration)
{
......
......@@ -133,6 +133,7 @@ internal static Socket CreateSocket(EndPoint endpoint)
var protocolType = addressFamily == AddressFamily.Unix ? ProtocolType.Unspecified : ProtocolType.Tcp;
var socket = new Socket(addressFamily, SocketType.Stream, protocolType);
SocketConnection.SetRecommendedClientOptions(socket);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Linger, false);
return socket;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment