Commit aeedada2 authored by Pavel Pochobut's avatar Pavel Pochobut

Fixed hardcoded connectCompletionType. Dorces async execution of...

Fixed hardcoded connectCompletionType. Dorces async execution of ConnectionMultiplexer.ReconfigureAsync in ConnectionMultiplexer.Connect
parent d994b7ad
using NUnit.Framework;
using System;
using System.Diagnostics;
using System.Threading;
namespace StackExchange.Redis.Tests
{
[TestFixture]
public class ConnectToUnexistingHost : TestBase
{
#if DEBUG
[Test]
[TestCase(CompletionType.Any)]
[TestCase(CompletionType.Sync)]
[TestCase(CompletionType.Async)]
public void ConnectToUnexistingHostFailsWithinTimeout(CompletionType completionType)
{
var sw = Stopwatch.StartNew();
try
{
var config = new ConfigurationOptions
{
EndPoints = { { "invalid", 1234 } },
ConnectTimeout = 1000
};
SocketManager.ConnectCompletionType = completionType;
using (var muxer = ConnectionMultiplexer.Connect(config))
{
Thread.Sleep(10000);
}
Assert.Fail("Connect should fail with RedisConnectionException exception");
}
catch (RedisConnectionException)
{
var elapsed = sw.ElapsedMilliseconds;
if (elapsed > 9000)
{
Assert.Fail("Connect should fail within ConnectTimeout");
}
}
finally
{
SocketManager.ConnectCompletionType = CompletionType.Any;
}
}
#endif
}
}
\ No newline at end of file
...@@ -67,6 +67,7 @@ ...@@ -67,6 +67,7 @@
<Compile Include="AsyncTests.cs" /> <Compile Include="AsyncTests.cs" />
<Compile Include="BasicOps.cs" /> <Compile Include="BasicOps.cs" />
<Compile Include="ConnectingFailDetection.cs" /> <Compile Include="ConnectingFailDetection.cs" />
<Compile Include="ConnectToUnexistingHost.cs" />
<Compile Include="HyperLogLog.cs" /> <Compile Include="HyperLogLog.cs" />
<Compile Include="WrapperBaseTests.cs" /> <Compile Include="WrapperBaseTests.cs" />
<Compile Include="TransactionWrapperTests.cs" /> <Compile Include="TransactionWrapperTests.cs" />
......
...@@ -718,48 +718,35 @@ static ConnectionMultiplexer CreateMultiplexer(object configuration) ...@@ -718,48 +718,35 @@ static ConnectionMultiplexer CreateMultiplexer(object configuration)
/// </summary> /// </summary>
public static ConnectionMultiplexer Connect(string configuration, TextWriter log = null) public static ConnectionMultiplexer Connect(string configuration, TextWriter log = null)
{ {
IDisposable killMe = null; return ConnectImpl(() => CreateMultiplexer(configuration), log);
try
{
var muxer = CreateMultiplexer(configuration);
killMe = muxer;
// note that task has timeouts internally, so it might take *just over* the reegular timeout
var task = muxer.ReconfigureAsync(true, false, log, null, "connect");
if (!task.Wait(muxer.SyncConnectTimeout(true)))
{
task.ObserveErrors();
if (muxer.RawConfig.AbortOnConnectFail)
{
throw new TimeoutException();
}
}
if(!task.Result) throw ExceptionFactory.UnableToConnect(muxer.failureMessage);
killMe = null;
return muxer;
}
finally
{
if (killMe != null) try { killMe.Dispose(); } catch { }
}
} }
/// <summary> /// <summary>
/// Create a new ConnectionMultiplexer instance /// Create a new ConnectionMultiplexer instance
/// </summary> /// </summary>
public static ConnectionMultiplexer Connect(ConfigurationOptions configuration, TextWriter log = null) public static ConnectionMultiplexer Connect(ConfigurationOptions configuration, TextWriter log = null)
{
return ConnectImpl(() => CreateMultiplexer(configuration), log);
}
private static ConnectionMultiplexer ConnectImpl(Func<ConnectionMultiplexer> multiplexerFactory, TextWriter log)
{ {
IDisposable killMe = null; IDisposable killMe = null;
Stopwatch sw = Stopwatch.StartNew();
try try
{ {
var muxer = CreateMultiplexer(configuration); var muxer = multiplexerFactory();
killMe = muxer; killMe = muxer;
// note that task has timeouts internally, so it might take *just over* the reegular timeout // note that task has timeouts internally, so it might take *just over* the regular timeout
var task = muxer.ReconfigureAsync(true, false, log, null, "connect"); // wrap into task to force async execution
var task = Task.Factory.StartNew(() => { return muxer.ReconfigureAsync(true, false, log, null, "connect").Result; });
if (!task.Wait(muxer.SyncConnectTimeout(true))) if (!task.Wait(muxer.SyncConnectTimeout(true)))
{ {
task.ObserveErrors(); task.ObserveErrors();
if (muxer.RawConfig.AbortOnConnectFail) if (muxer.RawConfig.AbortOnConnectFail)
{ {
throw new TimeoutException(); throw ExceptionFactory.UnableToConnect("Timeout");
} }
} }
if (!task.Result) throw ExceptionFactory.UnableToConnect(muxer.failureMessage); if (!task.Result) throw ExceptionFactory.UnableToConnect(muxer.failureMessage);
......
...@@ -130,7 +130,7 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback) ...@@ -130,7 +130,7 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback)
CompletionTypeHelper.RunWithCompletionType( CompletionTypeHelper.RunWithCompletionType(
(cb) => socket.BeginConnect(endpoint, cb, Tuple.Create(socket, callback)), (cb) => socket.BeginConnect(endpoint, cb, Tuple.Create(socket, callback)),
(ar) => EndConnectImpl(ar), (ar) => EndConnectImpl(ar),
CompletionType.Sync); connectCompletionType);
} }
catch (NotImplementedException ex) catch (NotImplementedException ex)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment