Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
S
StackExchange.Redis
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
tsai
StackExchange.Redis
Commits
1e7703c0
Commit
1e7703c0
authored
Dec 10, 2015
by
Marc Gravell
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'master' of github.com:StackExchange/StackExchange.Redis into jeremymeng-netcore
parents
af0b2223
51704fd7
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
155 additions
and
94 deletions
+155
-94
ClusterConfiguration.cs
...xchange.Redis/StackExchange/Redis/ClusterConfiguration.cs
+2
-8
ConnectionMultiplexer.cs
...change.Redis/StackExchange/Redis/ConnectionMultiplexer.cs
+144
-86
EndPointCollection.cs
...kExchange.Redis/StackExchange/Redis/EndPointCollection.cs
+9
-0
No files found.
StackExchange.Redis/StackExchange/Redis/ClusterConfiguration.cs
View file @
1e7703c0
...
...
@@ -281,14 +281,8 @@ internal ClusterNode(ClusterConfiguration configuration, string raw, EndPoint or
var
flags
=
parts
[
2
].
Split
(
StringSplits
.
Comma
);
if
(
flags
.
Contains
(
"myself"
))
{
endpoint
=
origin
;
}
else
{
endpoint
=
Format
.
TryParseEndPoint
(
parts
[
1
]);
}
endpoint
=
Format
.
TryParseEndPoint
(
parts
[
1
]);
nodeId
=
parts
[
0
];
isSlave
=
flags
.
Contains
(
"slave"
);
parentNodeId
=
string
.
IsNullOrWhiteSpace
(
parts
[
3
])
?
null
:
parts
[
3
];
...
...
StackExchange.Redis/StackExchange/Redis/ConnectionMultiplexer.cs
View file @
1e7703c0
...
...
@@ -1216,114 +1216,155 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
}
const
CommandFlags
flags
=
CommandFlags
.
NoRedirect
|
CommandFlags
.
HighPriority
;
var
available
=
new
Task
<
bool
>[
endpoints
.
Count
];
var
servers
=
new
ServerEndPoint
[
available
.
Length
];
List
<
ServerEndPoint
>
masters
=
new
List
<
ServerEndPoint
>(
endpoints
.
Count
);
bool
useTieBreakers
=
!
string
.
IsNullOrWhiteSpace
(
configuration
.
TieBreaker
);
var
tieBreakers
=
useTieBreakers
?
new
Task
<
string
>[
endpoints
.
Count
]
:
null
;
RedisKey
tieBreakerKey
=
useTieBreakers
?
(
RedisKey
)
configuration
.
TieBreaker
:
default
(
RedisKey
);
for
(
int
i
=
0
;
i
<
available
.
Length
;
i
++)
ServerEndPoint
[]
servers
=
null
;
Task
<
string
>[]
tieBreakers
=
null
;
bool
encounteredConnectedClusterServer
=
false
;
Stopwatch
watch
=
null
;
int
iterCount
=
first
?
2
:
1
;
// this is fix for https://github.com/StackExchange/StackExchange.Redis/issues/300
// auto discoverability of cluster nodes is made synchronous.
// we try to connect to endpoints specified inside the user provided configuration
// and when we encounter one such endpoint to which we are able to successfully connect,
// we get the list of cluster nodes from this endpoint and try to proactively connect
// to these nodes instead of relying on auto configure
for
(
int
iter
=
0
;
iter
<
iterCount
;
++
iter
)
{
Trace
(
"Testing: "
+
Format
.
ToString
(
endpoints
[
i
]));
var
server
=
GetServerEndPoint
(
endpoints
[
i
]);
//server.ReportNextFailure();
servers
[
i
]
=
server
;
if
(
reconfigureAll
&&
server
.
IsConnected
)
if
(
endpoints
==
null
)
break
;
var
available
=
new
Task
<
bool
>[
endpoints
.
Count
];
tieBreakers
=
useTieBreakers
?
new
Task
<
string
>[
endpoints
.
Count
]
:
null
;
servers
=
new
ServerEndPoint
[
available
.
Length
];
RedisKey
tieBreakerKey
=
useTieBreakers
?
(
RedisKey
)
configuration
.
TieBreaker
:
default
(
RedisKey
);
for
(
int
i
=
0
;
i
<
available
.
Length
;
i
++)
{
LogLocked
(
log
,
"Refreshing {0}..."
,
Format
.
ToString
(
server
.
EndPoint
));
// note that these will be processed synchronously *BEFORE* the tracer is processed,
// so we know that the configuration will be up to date if we see the tracer
server
.
AutoConfigure
(
null
);
}
available
[
i
]
=
server
.
SendTracer
(
log
);
if
(
useTieBreakers
)
{
LogLocked
(
log
,
"Requesting tie-break from {0} > {1}..."
,
Format
.
ToString
(
server
.
EndPoint
),
configuration
.
TieBreaker
);
Message
msg
=
Message
.
Create
(
0
,
flags
,
RedisCommand
.
GET
,
tieBreakerKey
);
msg
.
SetInternalCall
();
msg
=
LoggingMessage
.
Create
(
log
,
msg
);
tieBreakers
[
i
]
=
server
.
QueueDirectAsync
(
msg
,
ResultProcessor
.
String
);
Trace
(
"Testing: "
+
Format
.
ToString
(
endpoints
[
i
]));
var
server
=
GetServerEndPoint
(
endpoints
[
i
]);
//server.ReportNextFailure();
servers
[
i
]
=
server
;
if
(
reconfigureAll
&&
server
.
IsConnected
)
{
LogLocked
(
log
,
"Refreshing {0}..."
,
Format
.
ToString
(
server
.
EndPoint
));
// note that these will be processed synchronously *BEFORE* the tracer is processed,
// so we know that the configuration will be up to date if we see the tracer
server
.
AutoConfigure
(
null
);
}
available
[
i
]
=
server
.
SendTracer
(
log
);
if
(
useTieBreakers
)
{
LogLocked
(
log
,
"Requesting tie-break from {0} > {1}..."
,
Format
.
ToString
(
server
.
EndPoint
),
configuration
.
TieBreaker
);
Message
msg
=
Message
.
Create
(
0
,
flags
,
RedisCommand
.
GET
,
tieBreakerKey
);
msg
.
SetInternalCall
();
msg
=
LoggingMessage
.
Create
(
log
,
msg
);
tieBreakers
[
i
]
=
server
.
QueueDirectAsync
(
msg
,
ResultProcessor
.
String
);
}
}
}
LogLocked
(
log
,
"Allowing endpoints {0} to respond..."
,
TimeSpan
.
FromMilliseconds
(
configuration
.
ConnectTimeout
));
Trace
(
"Allowing endpoints "
+
TimeSpan
.
FromMilliseconds
(
configuration
.
ConnectTimeout
)
+
" to respond..."
);
await
WaitAllIgnoreErrorsAsync
(
available
,
configuration
.
ConnectTimeout
,
log
).
ForAwait
();
List
<
ServerEndPoint
>
masters
=
new
List
<
ServerEndPoint
>(
available
.
Length
);
watch
=
watch
??
Stopwatch
.
StartNew
();
var
remaining
=
configuration
.
ConnectTimeout
-
checked
((
int
)
watch
.
ElapsedMilliseconds
);
LogLocked
(
log
,
"Allowing endpoints {0} to respond..."
,
TimeSpan
.
FromMilliseconds
(
remaining
));
Trace
(
"Allowing endpoints "
+
TimeSpan
.
FromMilliseconds
(
remaining
)
+
" to respond..."
);
await
WaitAllIgnoreErrorsAsync
(
available
,
remaining
,
log
).
ForAwait
();
for
(
int
i
=
0
;
i
<
available
.
Length
;
i
++)
{
var
task
=
available
[
i
];
Trace
(
Format
.
ToString
(
endpoints
[
i
])
+
": "
+
task
.
Status
);
if
(
task
.
IsFaulted
)
EndPointCollection
updatedClusterEndpointCollection
=
null
;
for
(
int
i
=
0
;
i
<
available
.
Length
;
i
++)
{
servers
[
i
].
SetUnselectable
(
UnselectableFlags
.
DidNotRespond
);
var
aex
=
task
.
Exception
;
foreach
(
var
ex
in
aex
.
InnerExceptions
)
var
task
=
available
[
i
];
Trace
(
Format
.
ToString
(
endpoints
[
i
])
+
": "
+
task
.
Status
);
if
(
task
.
IsFaulted
)
{
servers
[
i
].
SetUnselectable
(
UnselectableFlags
.
DidNotRespond
);
var
aex
=
task
.
Exception
;
foreach
(
var
ex
in
aex
.
InnerExceptions
)
{
LogLocked
(
log
,
"{0} faulted: {1}"
,
Format
.
ToString
(
endpoints
[
i
]),
ex
.
Message
);
failureMessage
=
ex
.
Message
;
}
}
else
if
(
task
.
IsCanceled
)
{
LogLocked
(
log
,
"{0} faulted: {1}"
,
Format
.
ToString
(
endpoints
[
i
]),
ex
.
Message
);
failureMessage
=
ex
.
Message
;
servers
[
i
].
SetUnselectable
(
UnselectableFlags
.
DidNotRespond
);
LogLocked
(
log
,
"{0} was canceled"
,
Format
.
ToString
(
endpoints
[
i
]))
;
}
}
else
if
(
task
.
IsCanceled
)
{
servers
[
i
].
SetUnselectable
(
UnselectableFlags
.
DidNotRespond
);
LogLocked
(
log
,
"{0} was canceled"
,
Format
.
ToString
(
endpoints
[
i
]));
}
else
if
(
task
.
IsCompleted
)
{
var
server
=
servers
[
i
];
if
(
task
.
Result
)
else
if
(
task
.
IsCompleted
)
{
servers
[
i
].
ClearUnselectable
(
UnselectableFlags
.
DidNotRespond
);
LogLocked
(
log
,
"{0} returned with success"
,
Format
.
ToString
(
endpoints
[
i
]));
// count the server types
switch
(
server
.
ServerType
)
var
server
=
servers
[
i
];
if
(
task
.
Result
)
{
case
ServerType
.
Twemproxy
:
case
ServerType
.
Standalone
:
standaloneCount
++;
break
;
case
ServerType
.
Sentinel
:
sentinelCount
++;
break
;
case
ServerType
.
Cluster
:
clusterCount
++;
break
;
}
servers
[
i
].
ClearUnselectable
(
UnselectableFlags
.
DidNotRespond
);
LogLocked
(
log
,
"{0} returned with success"
,
Format
.
ToString
(
endpoints
[
i
]));
// count the server types
switch
(
server
.
ServerType
)
{
case
ServerType
.
Twemproxy
:
case
ServerType
.
Standalone
:
standaloneCount
++;
break
;
case
ServerType
.
Sentinel
:
sentinelCount
++;
break
;
case
ServerType
.
Cluster
:
clusterCount
++;
break
;
}
// set the server UnselectableFlags and update masters list
switch
(
server
.
ServerType
)
if
(
clusterCount
>
0
&&
!
encounteredConnectedClusterServer
)
{
// we have encountered a connected server with clustertype for the first time.
// so we will get list of other nodes from this server using "CLUSTER NODES" command
// and try to connect to these other nodes in the next iteration
encounteredConnectedClusterServer
=
true
;
updatedClusterEndpointCollection
=
GetEndpointsFromClusterNodes
(
server
,
log
);
}
// set the server UnselectableFlags and update masters list
switch
(
server
.
ServerType
)
{
case
ServerType
.
Twemproxy
:
case
ServerType
.
Sentinel
:
case
ServerType
.
Standalone
:
case
ServerType
.
Cluster
:
servers
[
i
].
ClearUnselectable
(
UnselectableFlags
.
ServerType
);
if
(
server
.
IsSlave
)
{
servers
[
i
].
ClearUnselectable
(
UnselectableFlags
.
RedundantMaster
);
}
else
{
masters
.
Add
(
server
);
}
break
;
default
:
servers
[
i
].
SetUnselectable
(
UnselectableFlags
.
ServerType
);
break
;
}
}
else
{
case
ServerType
.
Twemproxy
:
case
ServerType
.
Sentinel
:
case
ServerType
.
Standalone
:
case
ServerType
.
Cluster
:
servers
[
i
].
ClearUnselectable
(
UnselectableFlags
.
ServerType
);
if
(
server
.
IsSlave
)
{
servers
[
i
].
ClearUnselectable
(
UnselectableFlags
.
RedundantMaster
);
}
else
{
masters
.
Add
(
server
);
}
break
;
default
:
servers
[
i
].
SetUnselectable
(
UnselectableFlags
.
ServerType
);
break
;
servers
[
i
].
SetUnselectable
(
UnselectableFlags
.
DidNotRespond
);
LogLocked
(
log
,
"{0} returned, but incorrectly"
,
Format
.
ToString
(
endpoints
[
i
]));
}
}
else
{
servers
[
i
].
SetUnselectable
(
UnselectableFlags
.
DidNotRespond
);
LogLocked
(
log
,
"{0}
returned, but incorrectly
"
,
Format
.
ToString
(
endpoints
[
i
]));
LogLocked
(
log
,
"{0}
did not respond
"
,
Format
.
ToString
(
endpoints
[
i
]));
}
}
if
(
encounteredConnectedClusterServer
)
{
endpoints
=
updatedClusterEndpointCollection
;
}
else
{
servers
[
i
].
SetUnselectable
(
UnselectableFlags
.
DidNotRespond
);
LogLocked
(
log
,
"{0} did not respond"
,
Format
.
ToString
(
endpoints
[
i
]));
break
;
// we do not want to repeat the second iteration
}
}
...
...
@@ -1432,6 +1473,23 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
}
}
private
EndPointCollection
GetEndpointsFromClusterNodes
(
ServerEndPoint
server
,
TextWriter
log
)
{
var
message
=
Message
.
Create
(-
1
,
CommandFlags
.
None
,
RedisCommand
.
CLUSTER
,
RedisLiterals
.
NODES
);
ClusterConfiguration
clusterConfig
=
null
;
try
{
clusterConfig
=
this
.
ExecuteSyncImpl
(
message
,
ResultProcessor
.
ClusterNodes
,
server
);
return
new
EndPointCollection
(
clusterConfig
.
Nodes
.
Select
(
node
=>
node
.
EndPoint
).
ToList
());
}
catch
(
Exception
ex
)
{
LogLocked
(
log
,
"Encountered error while updating cluster config: "
+
ex
.
Message
);
return
null
;
}
}
private
void
ResetAllNonConnected
()
{
var
snapshot
=
serverSnapshot
;
...
...
StackExchange.Redis/StackExchange/Redis/EndPointCollection.cs
View file @
1e7703c0
using
System
;
using
System.Collections.Generic
;
using
System.Collections.ObjectModel
;
using
System.Net
;
...
...
@@ -9,6 +10,14 @@ namespace StackExchange.Redis
/// </summary>
public
sealed
class
EndPointCollection
:
Collection
<
EndPoint
>
{
public
EndPointCollection
()
:
base
()
{
}
public
EndPointCollection
(
IList
<
EndPoint
>
endpoints
)
:
base
(
endpoints
)
{
}
/// <summary>
/// Format an endpoint
/// </summary>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment