Commit e75d0746 authored by Marc Gravell's avatar Marc Gravell

find+fix more ways of not competing tasks; better tracing of message faults...

find+fix more ways of not competing tasks; better tracing of message faults via new MessageFaulted event (requires `internal` access and a build flag); improve ManyContexts (test) stability
parent 1fcf91d6
...@@ -52,7 +52,10 @@ internal static ConnectionMultiplexer GetWithFT(ITestOutputHelper output) ...@@ -52,7 +52,10 @@ internal static ConnectionMultiplexer GetWithFT(ITestOutputHelper output)
SyncTimeout = 15000, SyncTimeout = 15000,
}; };
var conn = ConnectionMultiplexer.Connect(options); var conn = ConnectionMultiplexer.Connect(options);
conn.MessageFaulted += (msg, ex, origin) =>
{
output.WriteLine($"Faulted from '{origin}': '{msg}' - {ex.Message}");
};
var server = conn.GetServer(ep); var server = conn.GetServer(ep);
var arr = (RedisResult[])server.Execute("module", "list"); var arr = (RedisResult[])server.Execute("module", "list");
bool found = false; bool found = false;
......
...@@ -100,8 +100,22 @@ public void SocketFailureError() ...@@ -100,8 +100,22 @@ public void SocketFailureError()
Assert.Equal(ConnectionFailureType.UnableToResolvePhysicalConnection, outer.FailureType); Assert.Equal(ConnectionFailureType.UnableToResolvePhysicalConnection, outer.FailureType);
Assert.NotNull(outer.InnerException); Assert.NotNull(outer.InnerException);
var inner = Assert.IsType<RedisConnectionException>(outer.InnerException); if (outer.InnerException is RedisConnectionException rce)
Assert.Equal(ConnectionFailureType.UnableToConnect, inner.FailureType); {
Assert.Equal(ConnectionFailureType.UnableToConnect, rce.FailureType);
}
else
{
Writer.WriteLine(outer.InnerException.ToString());
if (outer.InnerException is AggregateException inner)
{
foreach (var ex in inner.InnerExceptions)
{
Writer.WriteLine(ex.ToString());
}
}
Assert.False(true); // force fail
}
} }
#if DEBUG // needs AllowConnect, which is DEBUG only #if DEBUG // needs AllowConnect, which is DEBUG only
......
...@@ -150,18 +150,18 @@ public void ManyContexts() ...@@ -150,18 +150,18 @@ public void ManyContexts()
{ {
using (var conn = Create()) using (var conn = Create())
{ {
var profiler = new PerThreadProfiler(); var profiler = new AsyncLocalProfiler();
var prefix = Me(); var prefix = Me();
conn.RegisterProfiler(profiler.GetSession); conn.RegisterProfiler(profiler.GetSession);
var threads = new List<Thread>(); var tasks = new Task[16];
var results = new IEnumerable<IProfiledCommand>[16]; var results = new ProfiledCommandEnumerable[tasks.Length];
for (var i = 0; i < 16; i++) for (var i = 0; i < tasks.Length; i++)
{ {
var ix = i; var ix = i;
var thread = new Thread(() => var task = Task.Run(async () =>
{ {
var db = conn.GetDatabase(ix); var db = conn.GetDatabase(ix);
...@@ -169,25 +169,23 @@ public void ManyContexts() ...@@ -169,25 +169,23 @@ public void ManyContexts()
for (var j = 0; j < 1000; j++) for (var j = 0; j < 1000; j++)
{ {
allTasks.Add(db.StringGetAsync(prefix + ix)); var g = db.StringGetAsync(prefix + ix);
allTasks.Add(db.StringSetAsync(prefix + ix, "world" + ix)); var s = db.StringSetAsync(prefix + ix, "world" + ix);
// overlap the g+s, just for fun
await g;
await s;
} }
Task.WaitAll(allTasks.ToArray());
results[ix] = profiler.GetSession().FinishProfiling(); results[ix] = profiler.GetSession().FinishProfiling();
}); });
tasks[ix] = task;
threads.Add(thread);
} }
Task.WhenAll(tasks).Wait();
threads.ForEach(t => t.Start());
threads.ForEach(t => t.Join());
for (var i = 0; i < results.Length; i++) for (var i = 0; i < results.Length; i++)
{ {
var res = results[i]; var res = results[i];
Assert.NotNull(res);
var numGets = res.Count(r => r.Command == "GET"); var numGets = res.Count(r => r.Command == "GET");
var numSets = res.Count(r => r.Command == "SET"); var numSets = res.Count(r => r.Command == "SET");
......
...@@ -103,7 +103,10 @@ public async Task ConnectToSSLServer(bool useSsl, bool specifyHost) ...@@ -103,7 +103,10 @@ public async Task ConnectToSSLServer(bool useSsl, bool specifyHost)
using (var muxer = ConnectionMultiplexer.Connect(config, log)) using (var muxer = ConnectionMultiplexer.Connect(config, log))
{ {
Log("Connect log:"); Log("Connect log:");
Log(log.ToString()); lock (log)
{
Log(log.ToString());
}
Log("===="); Log("====");
muxer.ConnectionFailed += OnConnectionFailed; muxer.ConnectionFailed += OnConnectionFailed;
muxer.InternalError += OnInternalError; muxer.InternalError += OnInternalError;
......
...@@ -259,6 +259,7 @@ protected IServer GetAnyMaster(ConnectionMultiplexer muxer) ...@@ -259,6 +259,7 @@ protected IServer GetAnyMaster(ConnectionMultiplexer muxer)
} }
muxer.InternalError += OnInternalError; muxer.InternalError += OnInternalError;
muxer.ConnectionFailed += OnConnectionFailed; muxer.ConnectionFailed += OnConnectionFailed;
muxer.MessageFaulted += (msg, ex, origin) => Writer?.WriteLine($"Faulted from '{origin}': '{msg}' - '{(ex == null ? "(null)" : ex.Message)}'");
return muxer; return muxer;
} }
......
...@@ -11,6 +11,9 @@ ...@@ -11,6 +11,9 @@
<SignAssembly>true</SignAssembly> <SignAssembly>true</SignAssembly>
<PublicSign Condition=" '$(OS)' != 'Windows_NT' ">true</PublicSign> <PublicSign Condition=" '$(OS)' != 'Windows_NT' ">true</PublicSign>
<LangVersion>latest</LangVersion> <LangVersion>latest</LangVersion>
<!-- we should preferably not include this in release builds, but isn't dramatic -->
<DefineConstants>$(DefineConstants);TEST</DefineConstants>
</PropertyGroup> </PropertyGroup>
<PropertyGroup Condition="'$(Configuration)' == 'Debug' and '$(Computername)'=='OCHO'"> <PropertyGroup Condition="'$(Configuration)' == 'Debug' and '$(Computername)'=='OCHO'">
......
...@@ -4,3 +4,4 @@ ...@@ -4,3 +4,4 @@
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("StackExchange.Redis.Server, PublicKey=00240000048000009400000006020000002400005253413100040000010001007791a689e9d8950b44a9a8886baad2ea180e7a8a854f158c9b98345ca5009cdd2362c84f368f1c3658c132b3c0f74e44ff16aeb2e5b353b6e0fe02f923a050470caeac2bde47a2238a9c7125ed7dab14f486a5a64558df96640933b9f2b6db188fc4a820f96dce963b662fa8864adbff38e5b4542343f162ecdc6dad16912fff")] [assembly: InternalsVisibleTo("StackExchange.Redis.Server, PublicKey=00240000048000009400000006020000002400005253413100040000010001007791a689e9d8950b44a9a8886baad2ea180e7a8a854f158c9b98345ca5009cdd2362c84f368f1c3658c132b3c0f74e44ff16aeb2e5b353b6e0fe02f923a050470caeac2bde47a2238a9c7125ed7dab14f486a5a64558df96640933b9f2b6db188fc4a820f96dce963b662fa8864adbff38e5b4542343f162ecdc6dad16912fff")]
[assembly: InternalsVisibleTo("StackExchange.Redis.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007791a689e9d8950b44a9a8886baad2ea180e7a8a854f158c9b98345ca5009cdd2362c84f368f1c3658c132b3c0f74e44ff16aeb2e5b353b6e0fe02f923a050470caeac2bde47a2238a9c7125ed7dab14f486a5a64558df96640933b9f2b6db188fc4a820f96dce963b662fa8864adbff38e5b4542343f162ecdc6dad16912fff")] [assembly: InternalsVisibleTo("StackExchange.Redis.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007791a689e9d8950b44a9a8886baad2ea180e7a8a854f158c9b98345ca5009cdd2362c84f368f1c3658c132b3c0f74e44ff16aeb2e5b353b6e0fe02f923a050470caeac2bde47a2238a9c7125ed7dab14f486a5a64558df96640933b9f2b6db188fc4a820f96dce963b662fa8864adbff38e5b4542343f162ecdc6dad16912fff")]
[assembly: InternalsVisibleTo("NRediSearch.Test, PublicKey=00240000048000009400000006020000002400005253413100040000010001007791a689e9d8950b44a9a8886baad2ea180e7a8a854f158c9b98345ca5009cdd2362c84f368f1c3658c132b3c0f74e44ff16aeb2e5b353b6e0fe02f923a050470caeac2bde47a2238a9c7125ed7dab14f486a5a64558df96640933b9f2b6db188fc4a820f96dce963b662fa8864adbff38e5b4542343f162ecdc6dad16912fff")]
...@@ -27,7 +27,7 @@ internal static void CompleteSyncOrAsyncImpl(CompletionManager manager, IComplet ...@@ -27,7 +27,7 @@ internal static void CompleteSyncOrAsyncImpl(CompletionManager manager, IComplet
private long completedSync, completedAsync, failedAsync; private long completedSync, completedAsync, failedAsync;
public CompletionManager(ConnectionMultiplexer multiplexer, string name) public CompletionManager(ConnectionMultiplexer multiplexer, string name)
{ {
this.multiplexer = multiplexer; this.multiplexer = multiplexer ?? throw new ArgumentNullException(nameof(multiplexer));
this.name = name; this.name = name;
} }
...@@ -48,7 +48,7 @@ private void PerInstanceCompleteSyncOrAsync(ICompletable operation) ...@@ -48,7 +48,7 @@ private void PerInstanceCompleteSyncOrAsync(ICompletable operation)
else else
{ {
multiplexer.Trace("Using thread-pool for asynchronous completion", name); multiplexer.Trace("Using thread-pool for asynchronous completion", name);
multiplexer.SocketManager.ScheduleTask(s_AnyOrderCompletionHandler, operation); (multiplexer.SocketManager ?? SocketManager.Shared).ScheduleTask(s_AnyOrderCompletionHandler, operation);
Interlocked.Increment(ref completedAsync); // k, *technically* we haven't actually completed this yet, but: close enough Interlocked.Increment(ref completedAsync); // k, *technically* we haven't actually completed this yet, but: close enough
} }
} }
......
...@@ -597,6 +597,7 @@ internal bool ComputeResult(PhysicalConnection connection, RawResult result) ...@@ -597,6 +597,7 @@ internal bool ComputeResult(PhysicalConnection connection, RawResult result)
} }
catch (Exception ex) catch (Exception ex)
{ {
connection?.BridgeCouldBeNull?.Multiplexer?.OnMessageFaulted(this, ex);
box?.SetException(ex); box?.SetException(ex);
return box != null; // we still want to pulse/complete return box != null; // we still want to pulse/complete
} }
......
...@@ -343,7 +343,8 @@ private void AbandonPendingBacklog(Exception ex) ...@@ -343,7 +343,8 @@ private void AbandonPendingBacklog(Exception ex)
next = DequeueNextPendingBacklog(); next = DequeueNextPendingBacklog();
if(next != null) if(next != null)
{ {
next.SetException(ex); Multiplexer?.OnMessageFaulted(next, ex);
next.SetException(ex);
this.CompleteSyncOrAsync(next); this.CompleteSyncOrAsync(next);
} }
} while (next != null); } while (next != null);
......
...@@ -280,17 +280,17 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception ...@@ -280,17 +280,17 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception
Exception outerException = innerException; Exception outerException = innerException;
IdentifyFailureType(innerException, ref failureType); IdentifyFailureType(innerException, ref failureType);
var bridge = BridgeCouldBeNull;
if (_ioPipe != null || isInitialConnect) // if *we* didn't burn the pipe: flag it if (_ioPipe != null || isInitialConnect) // if *we* didn't burn the pipe: flag it
{ {
if (failureType == ConnectionFailureType.InternalFailure) OnInternalError(innerException, origin); if (failureType == ConnectionFailureType.InternalFailure) OnInternalError(innerException, origin);
// stop anything new coming in... // stop anything new coming in...
BridgeCouldBeNull?.Trace("Failed: " + failureType); bridge?.Trace("Failed: " + failureType);
int @in = -1; int @in = -1;
PhysicalBridge.State oldState = PhysicalBridge.State.Disconnected; PhysicalBridge.State oldState = PhysicalBridge.State.Disconnected;
bool isCurrent = false; bool isCurrent = false;
BridgeCouldBeNull?.OnDisconnected(failureType, this, out isCurrent, out oldState); bridge?.OnDisconnected(failureType, this, out isCurrent, out oldState);
if (oldState == PhysicalBridge.State.ConnectedEstablished) if (oldState == PhysicalBridge.State.ConnectedEstablished)
{ {
try try
...@@ -311,7 +311,6 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception ...@@ -311,7 +311,6 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception
var data = new List<Tuple<string, string>>(); var data = new List<Tuple<string, string>>();
if (IncludeDetailInExceptions) if (IncludeDetailInExceptions)
{ {
var bridge = BridgeCouldBeNull;
if (bridge != null) if (bridge != null)
{ {
exMessage.Append(" on ").Append(Format.ToString(bridge.ServerEndPoint?.EndPoint)).Append("/").Append(connectionType); exMessage.Append(" on ").Append(Format.ToString(bridge.ServerEndPoint?.EndPoint)).Append("/").Append(connectionType);
...@@ -354,19 +353,24 @@ void add(string lk, string sk, string v) ...@@ -354,19 +353,24 @@ void add(string lk, string sk, string v)
outerException.Data["Redis-" + kv.Item1] = kv.Item2; outerException.Data["Redis-" + kv.Item1] = kv.Item2;
} }
BridgeCouldBeNull?.OnConnectionFailed(this, failureType, outerException); bridge?.OnConnectionFailed(this, failureType, outerException);
} }
} }
// cleanup // cleanup
lock (_writtenAwaitingResponse) lock (_writtenAwaitingResponse)
{ {
BridgeCouldBeNull?.Trace(_writtenAwaitingResponse.Count != 0, "Failing outstanding messages: " + _writtenAwaitingResponse.Count); bridge?.Trace(_writtenAwaitingResponse.Count != 0, "Failing outstanding messages: " + _writtenAwaitingResponse.Count);
while (_writtenAwaitingResponse.Count != 0) while (_writtenAwaitingResponse.Count != 0)
{ {
var next = _writtenAwaitingResponse.Dequeue(); var next = _writtenAwaitingResponse.Dequeue();
BridgeCouldBeNull?.Trace("Failing: " + next); var ex = innerException is RedisException ? innerException : outerException;
next.SetException(innerException is RedisException ? innerException : outerException); if (bridge != null)
BridgeCouldBeNull.CompleteSyncOrAsync(next); {
bridge.Trace("Failing: " + next);
bridge.Multiplexer?.OnMessageFaulted(next, ex, origin);
}
next.SetException(ex);
bridge.CompleteSyncOrAsync(next);
} }
} }
...@@ -527,6 +531,7 @@ internal void OnBridgeHeartbeat() ...@@ -527,6 +531,7 @@ internal void OnBridgeHeartbeat()
if (msg.HasAsyncTimedOut(now, timeout, out var elapsed)) if (msg.HasAsyncTimedOut(now, timeout, out var elapsed))
{ {
var timeoutEx = ExceptionFactory.Timeout(includeDetail, $"Timeout awaiting response ({elapsed}ms elapsed, timeout is {timeout}ms)", msg, server); var timeoutEx = ExceptionFactory.Timeout(includeDetail, $"Timeout awaiting response ({elapsed}ms elapsed, timeout is {timeout}ms)", msg, server);
bridge.Multiplexer?.OnMessageFaulted(msg, timeoutEx);
msg.SetException(timeoutEx); // tell the message that it is doomed msg.SetException(timeoutEx); // tell the message that it is doomed
bridge.CompleteSyncOrAsync(msg); // prod it - kicks off async continuations etc bridge.CompleteSyncOrAsync(msg); // prod it - kicks off async continuations etc
bridge.Multiplexer.OnAsyncTimeout(); bridge.Multiplexer.OnAsyncTimeout();
......
...@@ -83,6 +83,24 @@ public bool MoveNext() ...@@ -83,6 +83,24 @@ public bool MoveNext()
/// </summary> /// </summary>
public int Count() => _count; public int Count() => _count;
/// <summary>
/// Returns the number of commands captured in this snapshot that match a condition
/// </summary>
public int Count(Func<IProfiledCommand, bool> predicate)
{
if (predicate == null) throw new ArgumentNullException(nameof(predicate));
if (_count == 0) return 0;
int result = 0;
var cur = _head;
for (int i = 0; i < _count; i++)
{
if (predicate(cur)) result++;
cur = cur.NextElement;
}
return result;
}
/// <summary> /// <summary>
/// Returns the captured commands as an array /// Returns the captured commands as an array
/// </summary> /// </summary>
......
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics;
using System.Net; using System.Net;
using System.Runtime.CompilerServices;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
...@@ -246,6 +248,23 @@ internal bool Validate(ConnectionMultiplexer multiplexer, RedisChannel channel) ...@@ -246,6 +248,23 @@ internal bool Validate(ConnectionMultiplexer multiplexer, RedisChannel channel)
return changed; return changed;
} }
} }
#if TEST
internal event Action<string, Exception, string> MessageFaulted;
#else
internal event Action<string, Exception, string> MessageFaulted
{ // completely empty shell event, just to keep the test suite compiling
add { } remove { }
}
#endif
[Conditional("TEST")]
internal void OnMessageFaulted(Message msg, Exception fault, [CallerMemberName] string origin = null)
{
#if TEST
MessageFaulted?.Invoke(msg?.CommandAndKey, fault, origin);
#endif
}
} }
internal sealed class RedisSubscriber : RedisBase, ISubscriber internal sealed class RedisSubscriber : RedisBase, ISubscriber
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment