Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
S
StackExchange.Redis
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
tsai
StackExchange.Redis
Commits
a541e22d
Commit
a541e22d
authored
Oct 27, 2014
by
Marc Gravell
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #113 from PashaPash/physical-connection-race
Fixed race condition in PhysicalConnection creation
parents
5ba566b1
804bc3ce
Changes
7
Show whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
206 additions
and
19 deletions
+206
-19
ConnectingFailDetection.cs
StackExchange.Redis.Tests/ConnectingFailDetection.cs
+106
-0
StackExchange.Redis.Tests.csproj
StackExchange.Redis.Tests/StackExchange.Redis.Tests.csproj
+1
-0
TestBase.cs
StackExchange.Redis.Tests/TestBase.cs
+4
-0
DebuggingAids.cs
StackExchange.Redis/StackExchange/Redis/DebuggingAids.cs
+73
-0
PhysicalBridge.cs
StackExchange.Redis/StackExchange/Redis/PhysicalBridge.cs
+4
-1
PhysicalConnection.cs
...kExchange.Redis/StackExchange/Redis/PhysicalConnection.cs
+7
-3
SocketManager.cs
StackExchange.Redis/StackExchange/Redis/SocketManager.cs
+11
-15
No files found.
StackExchange.Redis.Tests/ConnectingFailDetection.cs
0 → 100644
View file @
a541e22d
using
NUnit.Framework
;
using
System
;
using
System.Threading
;
namespace
StackExchange.Redis.Tests
{
[
TestFixture
]
public
class
ConnectingFailDetection
:
TestBase
{
#if DEBUG
[
TestCase
]
public
void
FastNoticesFailOnConnectingSync
()
{
try
{
using
(
var
muxer
=
Create
(
keepAlive
:
1
,
connectTimeout
:
10000
,
allowAdmin
:
true
))
{
var
conn
=
muxer
.
GetDatabase
();
conn
.
Ping
();
var
server
=
muxer
.
GetServer
(
muxer
.
GetEndPoints
()[
0
]);
muxer
.
AllowConnect
=
false
;
SocketManager
.
ConnectCompletionType
=
CompletionType
.
Sync
;
server
.
SimulateConnectionFailure
();
Assert
.
IsFalse
(
muxer
.
IsConnected
);
// should reconnect within 1 keepalive interval
muxer
.
AllowConnect
=
true
;
Console
.
WriteLine
(
"Waiting for reconnect"
);
Thread
.
Sleep
(
2000
);
Assert
.
IsTrue
(
muxer
.
IsConnected
);
}
ClearAmbientFailures
();
}
finally
{
SocketManager
.
ConnectCompletionType
=
CompletionType
.
Any
;
}
}
[
TestCase
]
public
void
ConnectsWhenBeginConnectCompletesSynchronously
()
{
try
{
SocketManager
.
ConnectCompletionType
=
CompletionType
.
Sync
;
using
(
var
muxer
=
Create
(
keepAlive
:
1
,
connectTimeout
:
3000
))
{
var
conn
=
muxer
.
GetDatabase
();
conn
.
Ping
();
Assert
.
IsTrue
(
muxer
.
IsConnected
);
}
ClearAmbientFailures
();
}
finally
{
SocketManager
.
ConnectCompletionType
=
CompletionType
.
Any
;
}
}
[
TestCase
]
public
void
FastNoticesFailOnConnectingAsync
()
{
try
{
using
(
var
muxer
=
Create
(
keepAlive
:
1
,
connectTimeout
:
10000
,
allowAdmin
:
true
))
{
var
conn
=
muxer
.
GetDatabase
();
conn
.
Ping
();
var
server
=
muxer
.
GetServer
(
muxer
.
GetEndPoints
()[
0
]);
muxer
.
AllowConnect
=
false
;
SocketManager
.
ConnectCompletionType
=
CompletionType
.
Async
;
server
.
SimulateConnectionFailure
();
Assert
.
IsFalse
(
muxer
.
IsConnected
);
// should reconnect within 1 keepalive interval
muxer
.
AllowConnect
=
true
;
Console
.
WriteLine
(
"Waiting for reconnect"
);
Thread
.
Sleep
(
2000
);
Assert
.
IsTrue
(
muxer
.
IsConnected
);
ClearAmbientFailures
();
}
}
finally
{
SocketManager
.
ConnectCompletionType
=
CompletionType
.
Any
;
}
}
#endif
}
}
StackExchange.Redis.Tests/StackExchange.Redis.Tests.csproj
View file @
a541e22d
...
...
@@ -66,6 +66,7 @@
<ItemGroup>
<Compile
Include=
"AsyncTests.cs"
/>
<Compile
Include=
"BasicOps.cs"
/>
<Compile
Include=
"ConnectingFailDetection.cs"
/>
<Compile
Include=
"HyperLogLog.cs"
/>
<Compile
Include=
"WrapperBaseTests.cs"
/>
<Compile
Include=
"TransactionWrapperTests.cs"
/>
...
...
StackExchange.Redis.Tests/TestBase.cs
View file @
a541e22d
...
...
@@ -76,6 +76,10 @@ protected void OnInternalError(object sender, InternalErrorEventArgs e)
volatile
int
expectedFailCount
;
[
SetUp
]
public
void
Setup
()
{
ClearAmbientFailures
();
}
public
void
ClearAmbientFailures
()
{
Collect
();
Interlocked
.
Exchange
(
ref
failCount
,
0
);
...
...
StackExchange.Redis/StackExchange/Redis/DebuggingAids.cs
View file @
a541e22d
...
...
@@ -191,6 +191,16 @@ partial class SocketManager
{
ignore
=
callback
.
IgnoreConnect
;
}
/// <summary>
/// Completion type for BeginConnect call
/// </summary>
public
static
CompletionType
ConnectCompletionType
{
get
;
set
;
}
partial
void
ShouldForceConnectCompletionType
(
ref
CompletionType
completionType
)
{
completionType
=
SocketManager
.
ConnectCompletionType
;
}
}
partial
interface
ISocketCallback
{
...
...
@@ -253,6 +263,69 @@ bool ISocketCallback.IgnoreConnect
}
#endif
/// <summary>
/// Completion type for CompletionTypeHelper
/// </summary>
public
enum
CompletionType
{
/// <summary>
/// Retain original completion type (either sync or async)
/// </summary>
Any
=
0
,
/// <summary>
/// Force sync completion
/// </summary>
Sync
=
1
,
/// <summary>
/// Force async completion
/// </summary>
Async
=
2
}
internal
class
CompletionTypeHelper
{
public
static
void
RunWithCompletionType
(
Func
<
AsyncCallback
,
IAsyncResult
>
beginAsync
,
AsyncCallback
callback
,
CompletionType
completionType
)
{
AsyncCallback
proxyCallback
;
if
(
completionType
==
CompletionType
.
Any
)
{
proxyCallback
=
(
ar
)
=>
{
if
(!
ar
.
CompletedSynchronously
)
{
callback
(
ar
);
}
};
}
else
{
proxyCallback
=
(
ar
)
=>
{
};
}
var
result
=
beginAsync
(
proxyCallback
);
if
(
completionType
==
CompletionType
.
Any
&&
!
result
.
CompletedSynchronously
)
{
return
;
}
result
.
AsyncWaitHandle
.
WaitOne
();
switch
(
completionType
)
{
case
CompletionType
.
Async
:
ThreadPool
.
QueueUserWorkItem
((
s
)
=>
{
callback
(
result
);
});
break
;
case
CompletionType
.
Any
:
case
CompletionType
.
Sync
:
callback
(
result
);
break
;
}
return
;
}
}
#if VERBOSE
partial
class
ConnectionMultiplexer
...
...
StackExchange.Redis/StackExchange/Redis/PhysicalBridge.cs
View file @
a541e22d
...
...
@@ -388,7 +388,7 @@ internal void OnHeartbeat(bool ifConnectedOnly)
long
newSampleCount
=
Interlocked
.
Read
(
ref
operationCount
);
Interlocked
.
Exchange
(
ref
profileLog
[
index
%
ProfileLogSamples
],
newSampleCount
);
Interlocked
.
Exchange
(
ref
profileLastLog
,
newSampleCount
);
Trace
(
"OnHeartbeat: "
+
(
State
)
state
);
switch
(
state
)
{
case
(
int
)
State
.
Connecting
:
...
...
@@ -709,7 +709,10 @@ private PhysicalConnection GetConnection()
{
Interlocked
.
Increment
(
ref
socketCount
);
Interlocked
.
Exchange
(
ref
connectStartTicks
,
Environment
.
TickCount
);
// separate creation and connection for case when connection completes synchronously
// in that case PhysicalConnection will call back to PhysicalBridge, and most of PhysicalBridge methods assumes that physical is not null;
physical
=
new
PhysicalConnection
(
this
);
physical
.
BeginConnect
();
}
}
return
null
;
...
...
StackExchange.Redis/StackExchange/Redis/PhysicalConnection.cs
View file @
a541e22d
...
...
@@ -80,11 +80,15 @@ public PhysicalConnection(PhysicalBridge bridge)
var
endpoint
=
bridge
.
ServerEndPoint
.
EndPoint
;
physicalName
=
connectionType
+
"#"
+
Interlocked
.
Increment
(
ref
totalCount
)
+
"@"
+
Format
.
ToString
(
endpoint
);
this
.
bridge
=
bridge
;
multiplexer
.
Trace
(
"Connecting..."
,
physicalName
);
OnCreateEcho
();
}
public
void
BeginConnect
()
{
var
endpoint
=
this
.
bridge
.
ServerEndPoint
.
EndPoint
;
multiplexer
.
Trace
(
"Connecting..."
,
physicalName
);
this
.
socketToken
=
multiplexer
.
SocketManager
.
BeginConnect
(
endpoint
,
this
);
//socket.SendTimeout = socket.ReceiveTimeout = multiplexer.TimeoutMilliseconds;
OnCreateEcho
();
}
private
enum
ReadMode
:
byte
...
...
StackExchange.Redis/StackExchange/Redis/SocketManager.cs
View file @
a541e22d
...
...
@@ -124,13 +124,15 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback)
socket
.
NoDelay
=
true
;
try
{
var
ar
=
socket
.
BeginConnect
(
endpoint
,
EndConnect
,
Tuple
.
Create
(
socket
,
callback
));
if
(
ar
.
CompletedSynchronously
)
{
ConnectionMultiplexer
.
TraceWithoutContext
(
"EndConnect (sync)"
);
EndConnectImpl
(
ar
);
CompletionType
connectCompletionType
=
CompletionType
.
Any
;
this
.
ShouldForceConnectCompletionType
(
ref
connectCompletionType
);
CompletionTypeHelper
.
RunWithCompletionType
(
(
cb
)
=>
socket
.
BeginConnect
(
endpoint
,
cb
,
Tuple
.
Create
(
socket
,
callback
)),
(
ar
)
=>
EndConnectImpl
(
ar
),
CompletionType
.
Sync
);
}
}
catch
(
NotImplementedException
ex
)
catch
(
NotImplementedException
ex
)
{
if
(!(
endpoint
is
IPEndPoint
))
{
...
...
@@ -185,14 +187,6 @@ internal void Shutdown(SocketToken token)
Shutdown
(
token
.
Socket
);
}
private
void
EndConnect
(
IAsyncResult
ar
)
{
if
(!
ar
.
CompletedSynchronously
)
{
ConnectionMultiplexer
.
TraceWithoutContext
(
"EndConnect (async)"
);
EndConnectImpl
(
ar
);
}
}
private
void
EndConnectImpl
(
IAsyncResult
ar
)
{
Tuple
<
Socket
,
ISocketCallback
>
tuple
=
null
;
...
...
@@ -262,6 +256,8 @@ private void EndConnectImpl(IAsyncResult ar)
partial
void
ShouldIgnoreConnect
(
ISocketCallback
callback
,
ref
bool
ignore
);
partial
void
ShouldForceConnectCompletionType
(
ref
CompletionType
completionType
);
[
System
.
Diagnostics
.
CodeAnalysis
.
SuppressMessage
(
"Microsoft.Usage"
,
"CA2202:Do not dispose objects multiple times"
)]
private
void
Shutdown
(
Socket
socket
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment