Commit 2e5923b0 authored by Nick Craver's avatar Nick Craver

Misc cleanup (including line endings for some)

parent d5fff16f
......@@ -103,7 +103,7 @@ protected void OnInternalError(object sender, InternalErrorEventArgs e)
}
private int privateFailCount;
private static AsyncLocal<int> sharedFailCount = new AsyncLocal<int>();
private static readonly AsyncLocal<int> sharedFailCount = new AsyncLocal<int>();
private volatile int expectedFailCount;
private readonly List<string> privateExceptions = new List<string>();
......
......@@ -97,8 +97,8 @@ public static bool TryParse(string range, out SlotRange value)
/// <param name="other">The other slot range to compare to.</param>
public int CompareTo(SlotRange other)
{
int delta = (int)this.from - (int)other.from;
return delta == 0 ? (int)this.to - (int)other.to : delta;
int delta = (int)from - (int)other.from;
return delta == 0 ? (int)to - (int)other.to : delta;
}
/// <summary>
......@@ -161,7 +161,7 @@ internal ClusterConfiguration(ServerSelectionStrategy serverSelectionStrategy, s
{
// Beware: Any exception thrown here will wreak silent havoc like inability to connect to cluster nodes or non returning calls
this.serverSelectionStrategy = serverSelectionStrategy;
this.Origin = origin;
Origin = origin;
using (var reader = new StringReader(nodes))
{
string line;
......@@ -178,7 +178,7 @@ internal ClusterConfiguration(ServerSelectionStrategy serverSelectionStrategy, s
// make sure that things like clusterConfiguration[clusterConfiguration.Origin]
// will work as expected.
if (node.IsMyself)
this.Origin = node.EndPoint;
Origin = node.EndPoint;
if (nodeLookup.ContainsKey(node.EndPoint))
{
......@@ -286,7 +286,7 @@ internal ClusterNode(ClusterConfiguration configuration, string raw, EndPoint or
{
// https://redis.io/commands/cluster-nodes
this.configuration = configuration;
this.Raw = raw;
Raw = raw;
var parts = raw.Split(StringSplits.Space);
var flags = parts[2].Split(StringSplits.Comma);
......@@ -319,8 +319,7 @@ internal ClusterNode(ClusterConfiguration configuration, string raw, EndPoint or
{
if (SlotRange.TryParse(parts[i], out SlotRange range))
{
if (slots == null) slots = new List<SlotRange>(parts.Length - i);
slots.Add(range);
(slots ?? (slots = new List<SlotRange>(parts.Length - i))).Add(range);
}
}
Slots = slots?.AsReadOnly() ?? NoSlots;
......@@ -338,10 +337,9 @@ public IList<ClusterNode> Children
List<ClusterNode> nodes = null;
foreach (var node in configuration.Nodes)
{
if (node.ParentNodeId == this.NodeId)
if (node.ParentNodeId == NodeId)
{
if (nodes == null) nodes = new List<ClusterNode>();
nodes.Add(node);
(nodes ?? (nodes = new List<ClusterNode>())).Add(node);
}
}
children = nodes?.AsReadOnly() ?? NoNodes;
......@@ -416,14 +414,14 @@ public int CompareTo(ClusterNode other)
{
if (other == null) return -1;
if (this.IsSlave != other.IsSlave) return IsSlave ? 1 : -1; // masters first
if (IsSlave != other.IsSlave) return IsSlave ? 1 : -1; // masters first
if (IsSlave) // both slaves? compare by parent, so we get masters A, B, C and then slaves of A, B, C
{
int i = string.CompareOrdinal(this.ParentNodeId, other.ParentNodeId);
int i = string.CompareOrdinal(ParentNodeId, other.ParentNodeId);
if (i != 0) return i;
}
return string.CompareOrdinal(this.NodeId, other.NodeId);
return string.CompareOrdinal(NodeId, other.NodeId);
}
/// <summary>
......
......@@ -8,27 +8,27 @@ namespace StackExchange.Redis
/// </summary>
public abstract class Condition
{
internal abstract Condition MapKeys(Func<RedisKey,RedisKey> map);
internal abstract Condition MapKeys(Func<RedisKey, RedisKey> map);
private Condition() { }
private Condition() { }
/// <summary>
/// Enforces that the given hash-field must have the specified value.
/// </summary>
/// <param name="key">The key of the hash to check.</param>
/// <param name="hashField">The field in the hash to check.</param>
/// </summary>
/// <param name="key">The key of the hash to check.</param>
/// <param name="hashField">The field in the hash to check.</param>
/// <param name="value">The value that the hash field must match.</param>
public static Condition HashEqual(RedisKey key, RedisValue hashField, RedisValue value)
{
if (hashField.IsNull) throw new ArgumentNullException(nameof(hashField));
if (value.IsNull) return HashNotExists(key, hashField);
return new EqualsCondition(key, hashField, true, value);
}
}
/// <summary>
/// Enforces that the given hash-field must exist.
/// </summary>
/// <param name="key">The key of the hash to check.</param>
/// </summary>
/// <param name="key">The key of the hash to check.</param>
/// <param name="hashField">The field in the hash to check.</param>
public static Condition HashExists(RedisKey key, RedisValue hashField)
{
......@@ -38,9 +38,9 @@ public static Condition HashExists(RedisKey key, RedisValue hashField)
/// <summary>
/// Enforces that the given hash-field must not have the specified value.
/// </summary>
/// <param name="key">The key of the hash to check.</param>
/// <param name="hashField">The field in the hash to check.</param>
/// </summary>
/// <param name="key">The key of the hash to check.</param>
/// <param name="hashField">The field in the hash to check.</param>
/// <param name="value">The value that the hash field must not match.</param>
public static Condition HashNotEqual(RedisKey key, RedisValue hashField, RedisValue value)
{
......@@ -51,8 +51,8 @@ public static Condition HashNotEqual(RedisKey key, RedisValue hashField, RedisVa
/// <summary>
/// Enforces that the given hash-field must not exist.
/// </summary>
/// <param name="key">The key of the hash to check.</param>
/// </summary>
/// <param name="key">The key of the hash to check.</param>
/// <param name="hashField">The field in the hash that must not exist.</param>
public static Condition HashNotExists(RedisKey key, RedisValue hashField)
{
......@@ -62,50 +62,50 @@ public static Condition HashNotExists(RedisKey key, RedisValue hashField)
/// <summary>
/// Enforces that the given key must exist.
/// </summary>
/// </summary>
/// <param name="key">The key that must exist.</param>
public static Condition KeyExists(RedisKey key) => new ExistsCondition(key, RedisType.None, RedisValue.Null, true);
/// <summary>
/// Enforces that the given key must not exist
/// </summary>
/// </summary>
/// <param name="key">The key that must not exist.</param>
public static Condition KeyNotExists(RedisKey key) => new ExistsCondition(key, RedisType.None, RedisValue.Null, false);
/// <summary>
/// Enforces that the given list index must have the specified value
/// </summary>
/// <param name="key">The key of the list to check.</param>
/// <param name="index">The position in the list to check.</param>
/// </summary>
/// <param name="key">The key of the list to check.</param>
/// <param name="index">The position in the list to check.</param>
/// <param name="value">The value of the list position that must match.</param>
public static Condition ListIndexEqual(RedisKey key, long index, RedisValue value) => new ListCondition(key, index, true, value);
/// <summary>
/// Enforces that the given list index must exist
/// </summary>
/// <param name="key">The key of the list to check.</param>
/// </summary>
/// <param name="key">The key of the list to check.</param>
/// <param name="index">The position in the list that must exist.</param>
public static Condition ListIndexExists(RedisKey key, long index) => new ListCondition(key, index, true, null);
/// <summary>
/// Enforces that the given list index must not have the specified value
/// </summary>
/// <param name="key">The key of the list to check.</param>
/// <param name="index">The position in the list to check.</param>
/// </summary>
/// <param name="key">The key of the list to check.</param>
/// <param name="index">The position in the list to check.</param>
/// <param name="value">The value of the list position must not match.</param>
public static Condition ListIndexNotEqual(RedisKey key, long index, RedisValue value) => new ListCondition(key, index, false, value);
/// <summary>
/// Enforces that the given list index must not exist
/// </summary>
/// <param name="key">The key of the list to check.</param>
/// </summary>
/// <param name="key">The key of the list to check.</param>
/// <param name="index">The position in the list that must not exist.</param>
public static Condition ListIndexNotExists(RedisKey key, long index) => new ListCondition(key, index, false, null);
public static Condition ListIndexNotExists(RedisKey key, long index) => new ListCondition(key, index, false, null);
/// <summary>
/// Enforces that the given key must have the specified value
/// </summary>
/// <param name="key">The key to check.</param>
/// </summary>
/// <param name="key">The key to check.</param>
/// <param name="value">The value that must match.</param>
public static Condition StringEqual(RedisKey key, RedisValue value)
{
......@@ -115,8 +115,8 @@ public static Condition StringEqual(RedisKey key, RedisValue value)
/// <summary>
/// Enforces that the given key must not have the specified value
/// </summary>
/// <param name="key">The key to check.</param>
/// </summary>
/// <param name="key">The key to check.</param>
/// <param name="value">The value that must not match.</param>
public static Condition StringNotEqual(RedisKey key, RedisValue value)
{
......@@ -126,134 +126,134 @@ public static Condition StringNotEqual(RedisKey key, RedisValue value)
/// <summary>
/// Enforces that the given hash length is a certain value
/// </summary>
/// <param name="key">The key of the hash to check.</param>
/// </summary>
/// <param name="key">The key of the hash to check.</param>
/// <param name="length">The length the hash must have.</param>
public static Condition HashLengthEqual(RedisKey key, long length) => new LengthCondition(key, RedisType.Hash, 0, length);
/// <summary>
/// Enforces that the given hash length is less than a certain value
/// </summary>
/// <param name="key">The key of the hash to check.</param>
/// </summary>
/// <param name="key">The key of the hash to check.</param>
/// <param name="length">The length the hash must be less than.</param>
public static Condition HashLengthLessThan(RedisKey key, long length) => new LengthCondition(key, RedisType.Hash, 1, length);
/// <summary>
/// Enforces that the given hash length is greater than a certain value
/// </summary>
/// <param name="key">The key of the hash to check.</param>
/// </summary>
/// <param name="key">The key of the hash to check.</param>
/// <param name="length">The length the hash must be greater than.</param>
public static Condition HashLengthGreaterThan(RedisKey key, long length) => new LengthCondition(key, RedisType.Hash, -1, length);
/// <summary>
/// Enforces that the given string length is a certain value
/// </summary>
/// <param name="key">The key of the string to check.</param>
/// </summary>
/// <param name="key">The key of the string to check.</param>
/// <param name="length">The length the string must be equal to.</param>
public static Condition StringLengthEqual(RedisKey key, long length) => new LengthCondition(key, RedisType.String, 0, length);
/// <summary>
/// Enforces that the given string length is less than a certain value
/// </summary>
/// <param name="key">The key of the string to check.</param>
/// </summary>
/// <param name="key">The key of the string to check.</param>
/// <param name="length">The length the string must be less than.</param>
public static Condition StringLengthLessThan(RedisKey key, long length) => new LengthCondition(key, RedisType.String, 1, length);
/// <summary>
/// Enforces that the given string length is greater than a certain value
/// </summary>
/// <param name="key">The key of the string to check.</param>
/// </summary>
/// <param name="key">The key of the string to check.</param>
/// <param name="length">The length the string must be greater than.</param>
public static Condition StringLengthGreaterThan(RedisKey key, long length) => new LengthCondition(key, RedisType.String, -1, length);
/// <summary>
/// Enforces that the given list length is a certain value
/// </summary>
/// <param name="key">The key of the list to check.</param>
/// </summary>
/// <param name="key">The key of the list to check.</param>
/// <param name="length">The length the list must be equal to.</param>
public static Condition ListLengthEqual(RedisKey key, long length) => new LengthCondition(key, RedisType.List, 0, length);
/// <summary>
/// Enforces that the given list length is less than a certain value
/// </summary>
/// <param name="key">The key of the list to check.</param>
/// </summary>
/// <param name="key">The key of the list to check.</param>
/// <param name="length">The length the list must be less than.</param>
public static Condition ListLengthLessThan(RedisKey key, long length) => new LengthCondition(key, RedisType.List, 1, length);
/// <summary>
/// Enforces that the given list length is greater than a certain value
/// </summary>
/// <param name="key">The key of the list to check.</param>
/// </summary>
/// <param name="key">The key of the list to check.</param>
/// <param name="length">The length the list must be greater than.</param>
public static Condition ListLengthGreaterThan(RedisKey key, long length) => new LengthCondition(key, RedisType.List, -1, length);
/// <summary>
/// Enforces that the given set cardinality is a certain value
/// </summary>
/// <param name="key">The key of the set to check.</param>
/// </summary>
/// <param name="key">The key of the set to check.</param>
/// <param name="length">The length the set must be equal to.</param>
public static Condition SetLengthEqual(RedisKey key, long length) => new LengthCondition(key, RedisType.Set, 0, length);
/// <summary>
/// Enforces that the given set cardinality is less than a certain value
/// </summary>
/// <param name="key">The key of the set to check.</param>
/// </summary>
/// <param name="key">The key of the set to check.</param>
/// <param name="length">The length the set must be less than.</param>
public static Condition SetLengthLessThan(RedisKey key, long length) => new LengthCondition(key, RedisType.Set, 1, length);
/// <summary>
/// Enforces that the given set cardinality is greater than a certain value
/// </summary>
/// <param name="key">The key of the set to check.</param>
/// </summary>
/// <param name="key">The key of the set to check.</param>
/// <param name="length">The length the set must be greater than.</param>
public static Condition SetLengthGreaterThan(RedisKey key, long length) => new LengthCondition(key, RedisType.Set, -1, length);
/// <summary>
/// Enforces that the given set contains a certain member
/// </summary>
/// <param name="key">The key of the set to check.</param>
/// </summary>
/// <param name="key">The key of the set to check.</param>
/// <param name="member">The member the set must contain.</param>
public static Condition SetContains(RedisKey key, RedisValue member) => new ExistsCondition(key, RedisType.Set, member, true);
/// <summary>
/// Enforces that the given set does not contain a certain member
/// </summary>
/// <param name="key">The key of the set to check.</param>
/// </summary>
/// <param name="key">The key of the set to check.</param>
/// <param name="member">The member the set must not contain.</param>
public static Condition SetNotContains(RedisKey key, RedisValue member) => new ExistsCondition(key, RedisType.Set, member, false);
/// <summary>
/// Enforces that the given sorted set cardinality is a certain value
/// </summary>
/// <param name="key">The key of the sorted set to check.</param>
/// </summary>
/// <param name="key">The key of the sorted set to check.</param>
/// <param name="length">The length the sorted set must be equal to.</param>
public static Condition SortedSetLengthEqual(RedisKey key, long length) => new LengthCondition(key, RedisType.SortedSet, 0, length);
/// <summary>
/// Enforces that the given sorted set cardinality is less than a certain value
/// </summary>
/// <param name="key">The key of the sorted set to check.</param>
/// </summary>
/// <param name="key">The key of the sorted set to check.</param>
/// <param name="length">The length the sorted set must be less than.</param>
public static Condition SortedSetLengthLessThan(RedisKey key, long length) => new LengthCondition(key, RedisType.SortedSet, 1, length);
/// <summary>
/// Enforces that the given sorted set cardinality is greater than a certain value
/// </summary>
/// <param name="key">The key of the sorted set to check.</param>
/// </summary>
/// <param name="key">The key of the sorted set to check.</param>
/// <param name="length">The length the sorted set must be greater than.</param>
public static Condition SortedSetLengthGreaterThan(RedisKey key, long length) => new LengthCondition(key, RedisType.SortedSet, -1, length);
/// <summary>
/// Enforces that the given sorted set contains a certain member
/// </summary>
/// <param name="key">The key of the sorted set to check.</param>
/// </summary>
/// <param name="key">The key of the sorted set to check.</param>
/// <param name="member">The member the sorted set must contain.</param>
public static Condition SortedSetContains(RedisKey key, RedisValue member) => new ExistsCondition(key, RedisType.SortedSet, member, true);
/// <summary>
/// Enforces that the given sorted set does not contain a certain member
/// </summary>
/// <param name="key">The key of the sorted set to check.</param>
/// </summary>
/// <param name="key">The key of the sorted set to check.</param>
/// <param name="member">The member the sorted set must not contain.</param>
public static Condition SortedSetNotContains(RedisKey key, RedisValue member) => new ExistsCondition(key, RedisType.SortedSet, member, false);
......@@ -293,7 +293,7 @@ private class ConditionMessage : Message.CommandKeyBase
public ConditionMessage(Condition condition, int db, CommandFlags flags, RedisCommand command, RedisKey key, RedisValue value)
: base(db, flags, command, key)
{
this.Condition = condition;
Condition = condition;
this.value = value; // note no assert here
}
......@@ -315,18 +315,18 @@ internal override void WriteImpl(PhysicalConnection physical)
}
internal class ExistsCondition : Condition
{
private readonly bool expectedResult;
{
private readonly bool expectedResult;
private readonly RedisValue expectedValue;
private readonly RedisKey key;
private readonly RedisKey key;
private readonly RedisType type;
private readonly RedisCommand cmd;
internal override Condition MapKeys(Func<RedisKey,RedisKey> map)
{
return new ExistsCondition(map(key), type, expectedValue, expectedResult);
internal override Condition MapKeys(Func<RedisKey, RedisKey> map)
{
return new ExistsCondition(map(key), type, expectedValue, expectedResult);
}
public ExistsCondition(RedisKey key, RedisType type, RedisValue expectedValue, bool expectedResult)
{
if (key.IsNull) throw new ArgumentException("key");
......@@ -335,11 +335,14 @@ public ExistsCondition(RedisKey key, RedisType type, RedisValue expectedValue, b
this.expectedValue = expectedValue;
this.expectedResult = expectedResult;
if (expectedValue.IsNull) {
if (expectedValue.IsNull)
{
cmd = RedisCommand.EXISTS;
}
else {
switch (type) {
else
{
switch (type)
{
case RedisType.Hash:
cmd = RedisCommand.HEXISTS;
break;
......@@ -376,10 +379,10 @@ internal override IEnumerable<Message> CreateMessages(int db, ResultBox resultBo
}
internal override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) => serverSelectionStrategy.HashSlot(key);
internal override bool TryValidate(RawResult result, out bool value)
{
switch (type)
switch (type)
{
case RedisType.SortedSet:
var parsedValue = result.AsRedisValue();
......@@ -402,12 +405,12 @@ internal override bool TryValidate(RawResult result, out bool value)
}
internal class EqualsCondition : Condition
{
internal override Condition MapKeys(Func<RedisKey,RedisKey> map)
{
return new EqualsCondition(map(key), hashField, expectedEqual, expectedValue);
{
internal override Condition MapKeys(Func<RedisKey, RedisKey> map)
{
return new EqualsCondition(map(key), hashField, expectedEqual, expectedValue);
}
private readonly bool expectedEqual;
private readonly RedisValue hashField, expectedValue;
private readonly RedisKey key;
......@@ -446,7 +449,7 @@ internal override int GetHashSlot(ServerSelectionStrategy serverSelectionStrateg
{
return serverSelectionStrategy.HashSlot(key);
}
internal override bool TryValidate(RawResult result, out bool value)
{
switch (result.Type)
......@@ -466,13 +469,13 @@ internal override bool TryValidate(RawResult result, out bool value)
}
internal class ListCondition : Condition
{
internal override Condition MapKeys(Func<RedisKey,RedisKey> map)
{
return new ListCondition(map(key), index, expectedResult, expectedValue);
{
internal override Condition MapKeys(Func<RedisKey, RedisKey> map)
{
return new ListCondition(map(key), index, expectedResult, expectedValue);
}
private readonly bool expectedResult;
private readonly bool expectedResult;
private readonly long index;
private readonly RedisValue? expectedValue;
private readonly RedisKey key;
......@@ -486,8 +489,8 @@ public ListCondition(RedisKey key, long index, bool expectedResult, RedisValue?
}
public override string ToString()
{
return ((string)key) + "[" + index.ToString() + "]"
{
return ((string)key) + "[" + index.ToString() + "]"
+ (expectedValue.HasValue ? (expectedResult ? " == " : " != ") + expectedValue.Value : (expectedResult ? " exists" : " does not exist"));
}
......@@ -506,7 +509,7 @@ internal sealed override IEnumerable<Message> CreateMessages(int db, ResultBox r
}
internal override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) => serverSelectionStrategy.HashSlot(key);
internal override bool TryValidate(RawResult result, out bool value)
{
switch (result.Type)
......@@ -534,12 +537,12 @@ internal override bool TryValidate(RawResult result, out bool value)
}
internal class LengthCondition : Condition
{
internal override Condition MapKeys(Func<RedisKey,RedisKey> map)
{
return new LengthCondition(map(key), type, compareToResult, expectedLength);
{
internal override Condition MapKeys(Func<RedisKey, RedisKey> map)
{
return new LengthCondition(map(key), type, compareToResult, expectedLength);
}
private readonly int compareToResult;
private readonly long expectedLength;
private readonly RedisKey key;
......@@ -553,7 +556,8 @@ public LengthCondition(RedisKey key, RedisType type, int compareToResult, long e
this.compareToResult = compareToResult;
this.expectedLength = expectedLength;
this.type = type;
switch (type) {
switch (type)
{
case RedisType.Hash:
cmd = RedisCommand.HLEN;
break;
......@@ -580,7 +584,7 @@ public LengthCondition(RedisKey key, RedisType type, int compareToResult, long e
}
public override string ToString()
{
{
return ((string)key) + " " + type + " length" + GetComparisonString() + expectedLength;
}
......@@ -607,7 +611,7 @@ internal override int GetHashSlot(ServerSelectionStrategy serverSelectionStrateg
{
return serverSelectionStrategy.HashSlot(key);
}
internal override bool TryValidate(RawResult result, out bool value)
{
switch (result.Type)
......@@ -616,7 +620,7 @@ internal override bool TryValidate(RawResult result, out bool value)
case ResultType.SimpleString:
case ResultType.Integer:
var parsed = result.AsRedisValue();
value = parsed.IsInteger && (expectedLength.CompareTo((long) parsed) == compareToResult);
value = parsed.IsInteger && (expectedLength.CompareTo((long)parsed) == compareToResult);
ConnectionMultiplexer.TraceWithoutContext("actual: " + (string)parsed + "; expected: " + expectedLength +
"; wanted: " + GetComparisonString() + "; voting: " + value);
return true;
......@@ -640,7 +644,7 @@ public sealed class ConditionResult
internal ConditionResult(Condition condition)
{
this.Condition = condition;
Condition = condition;
resultBox = ResultBox<bool>.Get(condition);
}
......
......@@ -98,7 +98,7 @@ internal static string TryGetAzureRoleInstanceIdNoThrow()
var currentRoleInstanceId = currentRoleInstanceProp.GetValue(null, null);
roleInstanceId = currentRoleInstanceId.GetType().GetProperty("Id").GetValue(currentRoleInstanceId, null).ToString();
if (String.IsNullOrEmpty(roleInstanceId))
if (string.IsNullOrEmpty(roleInstanceId))
{
roleInstanceId = null;
}
......@@ -2050,7 +2050,7 @@ void add(string lk, string sk, string v)
add("Client-Name", "clientName", ClientName);
add("Server-Endpoint", "serverEndpoint", server.EndPoint.ToString());
var hashSlot = message.GetHashSlot(this.ServerSelectionStrategy);
var hashSlot = message.GetHashSlot(ServerSelectionStrategy);
// only add keyslot if its a valid cluster key slot
if (hashSlot != ServerSelectionStrategy.NoSlot)
{
......
using System;
using System.Collections.Generic;
using System.Text;
using System.Collections.Generic;
using System.Text;
namespace StackExchange.Redis
{
internal static class ExceptionFactory
{
private const string
DataCommandKey = "redis-command",
DataSentStatusKey = "request-sent-status",
DataServerKey = "redis-server";
private const string
DataCommandKey = "redis-command",
DataSentStatusKey = "request-sent-status",
DataServerKey = "redis-server";
internal static Exception AdminModeNotEnabled(bool includeDetail, RedisCommand command, Message message, ServerEndPoint server)
{
......@@ -18,23 +18,23 @@ internal static Exception AdminModeNotEnabled(bool includeDetail, RedisCommand c
if (includeDetail) AddDetail(ex, message, server, s);
return ex;
}
internal static Exception CommandDisabled(bool includeDetail, RedisCommand command, Message message, ServerEndPoint server)
{
string s = GetLabel(includeDetail, command, message);
var ex = new RedisCommandException("This operation has been disabled in the command-map and cannot be used: " + s);
if (includeDetail) AddDetail(ex, message, server, s);
return ex;
}
}
internal static Exception TooManyArgs(bool includeDetail, string command, Message message, ServerEndPoint server, int required)
{
string s = GetLabel(includeDetail, command, message);
var ex = new RedisCommandException($"This operation would involve too many arguments ({required} vs the redis limit of {PhysicalConnection.REDIS_MAX_ARGS}): {s}");
if (includeDetail) AddDetail(ex, message, server, s);
return ex;
}
}
internal static Exception CommandDisabled(bool includeDetail, string command, Message message, ServerEndPoint server)
{
string s = GetLabel(includeDetail, command, message);
......@@ -87,88 +87,88 @@ internal static Exception MultiSlot(bool includeDetail, Message message)
if (includeDetail) AddDetail(ex, message, null, null);
return ex;
}
internal static string GetInnerMostExceptionMessage(Exception e)
{
if (e == null)
{
return "";
}
else
{
while (e.InnerException != null)
{
e = e.InnerException;
}
return e.Message;
}
}
internal static Exception NoConnectionAvailable(bool includeDetail, bool includePerformanceCounters, RedisCommand command, Message message, ServerEndPoint server, ServerEndPoint[] serverSnapshot)
{
string commandLabel = GetLabel(includeDetail, command, message);
if (server != null)
{
//if we already have the serverEndpoint for connection failure use that
//otherwise it would output state of all the endpoints
serverSnapshot = new ServerEndPoint[] { server };
internal static string GetInnerMostExceptionMessage(Exception e)
{
if (e == null)
{
return "";
}
else
{
while (e.InnerException != null)
{
e = e.InnerException;
}
return e.Message;
}
}
internal static Exception NoConnectionAvailable(bool includeDetail, bool includePerformanceCounters, RedisCommand command, Message message, ServerEndPoint server, ServerEndPoint[] serverSnapshot)
{
string commandLabel = GetLabel(includeDetail, command, message);
if (server != null)
{
//if we already have the serverEndpoint for connection failure use that
//otherwise it would output state of all the endpoints
serverSnapshot = new ServerEndPoint[] { server };
}
var innerException = PopulateInnerExceptions(serverSnapshot);
StringBuilder exceptionmessage = new StringBuilder("No connection is available to service this operation: ").Append(commandLabel);
string innermostExceptionstring = GetInnerMostExceptionMessage(innerException);
if (!string.IsNullOrEmpty(innermostExceptionstring))
{
exceptionmessage.Append("; ").Append(innermostExceptionstring);
}
#if FEATURE_PERFCOUNTER
if (includeDetail)
{
exceptionmessage.Append("; ").Append(ConnectionMultiplexer.GetThreadPoolAndCPUSummary(includePerformanceCounters));
}
#endif
var ex = new RedisConnectionException(ConnectionFailureType.UnableToResolvePhysicalConnection, exceptionmessage.ToString(), innerException, message?.Status ?? CommandStatus.Unknown);
if (includeDetail)
{
AddDetail(ex, message, server, commandLabel);
}
return ex;
}
internal static Exception PopulateInnerExceptions(ServerEndPoint[] serverSnapshot)
{
var innerExceptions = new List<Exception>();
if (serverSnapshot != null)
{
if (serverSnapshot.Length > 0 && serverSnapshot[0].Multiplexer.LastException != null)
{
innerExceptions.Add(serverSnapshot[0].Multiplexer.LastException);
}
for (int i = 0; i < serverSnapshot.Length; i++)
{
if (serverSnapshot[i].LastException != null)
{
var lastException = serverSnapshot[i].LastException;
innerExceptions.Add(lastException);
}
}
}
var innerException = PopulateInnerExceptions(serverSnapshot);
StringBuilder exceptionmessage = new StringBuilder("No connection is available to service this operation: ").Append(commandLabel);
string innermostExceptionstring = GetInnerMostExceptionMessage(innerException);
if (!string.IsNullOrEmpty(innermostExceptionstring))
{
exceptionmessage.Append("; ").Append(innermostExceptionstring);
}
#if FEATURE_PERFCOUNTER
if (includeDetail)
{
exceptionmessage.Append("; ").Append(ConnectionMultiplexer.GetThreadPoolAndCPUSummary(includePerformanceCounters));
}
#endif
var ex = new RedisConnectionException(ConnectionFailureType.UnableToResolvePhysicalConnection, exceptionmessage.ToString(), innerException, message?.Status ?? CommandStatus.Unknown);
if (includeDetail)
{
AddDetail(ex, message, server, commandLabel);
}
return ex;
}
internal static Exception PopulateInnerExceptions(ServerEndPoint[] serverSnapshot)
{
var innerExceptions = new List<Exception>();
if (serverSnapshot != null)
{
if (serverSnapshot.Length > 0 && serverSnapshot[0].Multiplexer.LastException != null)
{
innerExceptions.Add(serverSnapshot[0].Multiplexer.LastException);
}
for (int i = 0; i < serverSnapshot.Length; i++)
{
if (serverSnapshot[i].LastException != null)
{
var lastException = serverSnapshot[i].LastException;
innerExceptions.Add(lastException);
}
}
}
if (innerExceptions.Count == 1)
{
return innerExceptions[0];
}
else if(innerExceptions.Count > 1)
{
return new AggregateException(innerExceptions);
}
return null;
}
if (innerExceptions.Count == 1)
{
return innerExceptions[0];
}
else if(innerExceptions.Count > 1)
{
return new AggregateException(innerExceptions);
}
return null;
}
internal static Exception NotSupported(bool includeDetail, RedisCommand command)
{
......@@ -177,7 +177,7 @@ internal static Exception NotSupported(bool includeDetail, RedisCommand command)
if (includeDetail) AddDetail(ex, null, null, s);
return ex;
}
internal static Exception NoCursor(RedisCommand command)
{
string s = GetLabel(false, command, null);
......@@ -195,16 +195,16 @@ private static void AddDetail(Exception exception, Message message, ServerEndPoi
{
if (exception != null)
{
if (message != null)
{
exception.Data.Add(DataCommandKey, message.CommandAndKey);
exception.Data.Add(DataSentStatusKey, message.Status);
if (message != null)
{
exception.Data.Add(DataCommandKey, message.CommandAndKey);
exception.Data.Add(DataSentStatusKey, message.Status);
}
else if (label != null)
{
exception.Data.Add(DataCommandKey, label);
}
else if (label != null)
{
exception.Data.Add(DataCommandKey, label);
}
if (server != null) exception.Data.Add(DataServerKey, Format.ToString(server.EndPoint));
}
}
......@@ -212,8 +212,8 @@ private static void AddDetail(Exception exception, Message message, ServerEndPoi
private static string GetLabel(bool includeDetail, RedisCommand command, Message message)
{
return message == null ? command.ToString() : (includeDetail ? message.CommandAndKey : message.Command.ToString());
}
}
private static string GetLabel(bool includeDetail, string command, Message message)
{
return message == null ? command : (includeDetail ? message.CommandAndKey : message.Command.ToString());
......@@ -226,18 +226,18 @@ internal static Exception UnableToConnect(bool abortOnConnect, string failureMes
string.Format("It was not possible to connect to the redis server(s); {0}{1}", abortOnConnectionFailure, failureMessage));
}
internal static Exception BeganProfilingWithDuplicateContext(object forContext)
{
var exc = new InvalidOperationException("Attempted to begin profiling for the same context twice");
exc.Data["forContext"] = forContext;
return exc;
internal static Exception BeganProfilingWithDuplicateContext(object forContext)
{
var exc = new InvalidOperationException("Attempted to begin profiling for the same context twice");
exc.Data["forContext"] = forContext;
return exc;
}
internal static Exception FinishedProfilingWithInvalidContext(object forContext)
{
var exc = new InvalidOperationException("Attempted to finish profiling for a context which is no longer valid, or was never begun");
exc.Data["forContext"] = forContext;
return exc;
internal static Exception FinishedProfilingWithInvalidContext(object forContext)
{
var exc = new InvalidOperationException("Attempted to finish profiling for a context which is no longer valid, or was never begun");
exc.Data["forContext"] = forContext;
return exc;
}
}
}
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
#if NETSTANDARD1_5
using System.Threading.Tasks;
#endif
namespace StackExchange.Redis
{
internal sealed partial class PhysicalConnection : IDisposable, ISocketCallback
{
internal readonly byte[] ChannelPrefix;
private const int DefaultRedisDatabaseCount = 16;
private static readonly byte[] Crlf = Encoding.ASCII.GetBytes("\r\n");
#if NETSTANDARD1_5
private readonly Action<Task<int>> endRead;
private static Action<Task<int>> EndReadFactory(PhysicalConnection physical)
{
return result =>
{ // can't capture AsyncState on SocketRead, so we'll do it once per physical instead
if (result.IsFaulted)
{
GC.KeepAlive(result.Exception);
}
try
{
physical.Multiplexer.Trace("Completed asynchronously: processing in callback", physical.physicalName);
if (physical.EndReading(result)) physical.BeginReading();
}
catch (Exception ex)
{
physical.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
}
};
}
#else
private static readonly AsyncCallback endRead = result =>
{
PhysicalConnection physical;
if (result.CompletedSynchronously || (physical = result.AsyncState as PhysicalConnection) == null) return;
try
{
physical.Multiplexer.Trace("Completed asynchronously: processing in callback", physical.physicalName);
if (physical.EndReading(result)) physical.BeginReading();
}
catch (Exception ex)
{
physical.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
}
};
#endif
private static readonly byte[] message = Encoding.UTF8.GetBytes("message"), pmessage = Encoding.UTF8.GetBytes("pmessage");
private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select(
i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray();
private static readonly Message
ReusableReadOnlyCommand = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.READONLY),
ReusableReadWriteCommand = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.READWRITE);
private static int totalCount;
private readonly ConnectionType connectionType;
// things sent to this physical, but not yet received
private readonly Queue<Message> outstanding = new Queue<Message>();
private readonly string physicalName;
private volatile int currentDatabase = 0;
private ReadMode currentReadMode = ReadMode.NotSpecified;
private int failureReported;
private byte[] ioBuffer = new byte[512];
private int ioBufferBytes = 0;
private int lastWriteTickCount, lastReadTickCount, lastBeatTickCount;
private int firstUnansweredWriteTickCount;
private Stream netStream, outStream;
private SocketToken socketToken;
public PhysicalConnection(PhysicalBridge bridge)
{
lastWriteTickCount = lastReadTickCount = Environment.TickCount;
lastBeatTickCount = 0;
connectionType = bridge.ConnectionType;
Multiplexer = bridge.Multiplexer;
ChannelPrefix = Multiplexer.RawConfig.ChannelPrefix;
if (ChannelPrefix?.Length == 0) ChannelPrefix = null; // null tests are easier than null+empty
var endpoint = bridge.ServerEndPoint.EndPoint;
physicalName = connectionType + "#" + Interlocked.Increment(ref totalCount) + "@" + Format.ToString(endpoint);
Bridge = bridge;
#if NETSTANDARD1_5
endRead = EndReadFactory(this);
#endif
OnCreateEcho();
}
public void BeginConnect(TextWriter log)
{
VolatileWrapper.Write(ref firstUnansweredWriteTickCount, 0);
var endpoint = Bridge.ServerEndPoint.EndPoint;
Multiplexer.Trace("Connecting...", physicalName);
socketToken = Multiplexer.SocketManager.BeginConnect(endpoint, this, Multiplexer, log);
}
private enum ReadMode : byte
{
NotSpecified,
ReadOnly,
ReadWrite
}
public PhysicalBridge Bridge { get; }
public long LastWriteSecondsAgo => unchecked(Environment.TickCount - VolatileWrapper.Read(ref lastWriteTickCount)) / 1000;
public ConnectionMultiplexer Multiplexer { get; }
public long SubscriptionCount { get; set; }
public bool TransactionActive { get; internal set; }
public void Dispose()
{
if (outStream != null)
{
Multiplexer.Trace("Disconnecting...", physicalName);
#if !NETSTANDARD1_5
try { outStream.Close(); } catch { }
#endif
try { outStream.Dispose(); } catch { }
outStream = null;
}
if (netStream != null)
{
#if !NETSTANDARD1_5
try { netStream.Close(); } catch { }
#endif
try { netStream.Dispose(); } catch { }
netStream = null;
}
if (socketToken.HasValue)
{
Multiplexer.SocketManager?.Shutdown(socketToken);
socketToken = default(SocketToken);
Multiplexer.Trace("Disconnected", physicalName);
RecordConnectionFailed(ConnectionFailureType.ConnectionDisposed);
}
OnCloseEcho();
}
public void Flush()
{
var tmp = outStream;
if (tmp != null)
{
tmp.Flush();
Interlocked.Exchange(ref lastWriteTickCount, Environment.TickCount);
}
}
public void RecordConnectionFailed(ConnectionFailureType failureType, Exception innerException = null, [CallerMemberName] string origin = null)
{
var mgrState = SocketManager.ManagerState.CheckForStaleConnections;
RecordConnectionFailed(failureType, ref mgrState, innerException, origin);
}
public void RecordConnectionFailed(ConnectionFailureType failureType, ref SocketManager.ManagerState managerState, Exception innerException = null, [CallerMemberName] string origin = null)
{
IdentifyFailureType(innerException, ref failureType);
managerState = SocketManager.ManagerState.RecordConnectionFailed_OnInternalError;
if (failureType == ConnectionFailureType.InternalFailure) OnInternalError(innerException, origin);
// stop anything new coming in...
Bridge.Trace("Failed: " + failureType);
int @in = -1, ar = -1;
managerState = SocketManager.ManagerState.RecordConnectionFailed_OnDisconnected;
Bridge.OnDisconnected(failureType, this, out bool isCurrent, out PhysicalBridge.State oldState);
if (oldState == PhysicalBridge.State.ConnectedEstablished)
{
try
{
@in = GetAvailableInboundBytes(out ar);
}
catch { /* best effort only */ }
}
if (isCurrent && Interlocked.CompareExchange(ref failureReported, 1, 0) == 0)
{
managerState = SocketManager.ManagerState.RecordConnectionFailed_ReportFailure;
int now = Environment.TickCount, lastRead = VolatileWrapper.Read(ref lastReadTickCount), lastWrite = VolatileWrapper.Read(ref lastWriteTickCount),
lastBeat = VolatileWrapper.Read(ref lastBeatTickCount);
int unansweredRead = VolatileWrapper.Read(ref firstUnansweredWriteTickCount);
var exMessage = new StringBuilder(failureType + " on " + Format.ToString(Bridge.ServerEndPoint.EndPoint) + "/" + connectionType);
var data = new List<Tuple<string, string>>
{
Tuple.Create("FailureType", failureType.ToString()),
Tuple.Create("EndPoint", Format.ToString(Bridge.ServerEndPoint.EndPoint))
};
void add(string lk, string sk, string v)
{
data.Add(Tuple.Create(lk, v));
exMessage.Append(", ").Append(sk).Append(": ").Append(v);
}
add("Origin", "origin", origin);
add("Input-Buffer", "input-buffer", ioBufferBytes.ToString());
add("Outstanding-Responses", "outstanding", GetSentAwaitingResponseCount().ToString());
add("Last-Read", "last-read", (unchecked(now - lastRead) / 1000) + "s ago");
add("Last-Write", "last-write", (unchecked(now - lastWrite) / 1000) + "s ago");
add("Unanswered-Write", "unanswered-write", (unchecked(now - unansweredRead) / 1000) + "s ago");
add("Keep-Alive", "keep-alive", Bridge.ServerEndPoint.WriteEverySeconds + "s");
add("Pending", "pending", Bridge.GetPendingCount().ToString());
add("Previous-Physical-State", "state", oldState.ToString());
if (@in >= 0)
{
add("Inbound-Bytes", "in", @in.ToString());
add("Active-Readers", "ar", ar.ToString());
}
add("Last-Heartbeat", "last-heartbeat", (lastBeat == 0 ? "never" : ((unchecked(now - lastBeat) / 1000) + "s ago")) + (Bridge.IsBeating ? " (mid-beat)" : ""));
add("Last-Multiplexer-Heartbeat", "last-mbeat", Multiplexer.LastHeartbeatSecondsAgo + "s ago");
add("Last-Global-Heartbeat", "global", ConnectionMultiplexer.LastGlobalHeartbeatSecondsAgo + "s ago");
#if FEATURE_SOCKET_MODE_POLL
var mgr = Bridge.Multiplexer.SocketManager;
add("SocketManager-State", "mgr", mgr.State.ToString());
add("Last-Error", "err", mgr.LastErrorTimeRelative());
#endif
var ex = innerException == null
? new RedisConnectionException(failureType, exMessage.ToString())
: new RedisConnectionException(failureType, exMessage.ToString(), innerException);
foreach (var kv in data)
{
ex.Data["Redis-" + kv.Item1] = kv.Item2;
}
managerState = SocketManager.ManagerState.RecordConnectionFailed_OnConnectionFailed;
Bridge.OnConnectionFailed(this, failureType, ex);
}
// cleanup
managerState = SocketManager.ManagerState.RecordConnectionFailed_FailOutstanding;
lock (outstanding)
{
Bridge.Trace(outstanding.Count != 0, "Failing outstanding messages: " + outstanding.Count);
while (outstanding.Count != 0)
{
var next = outstanding.Dequeue();
Bridge.Trace("Failing: " + next);
next.Fail(failureType, innerException);
Bridge.CompleteSyncOrAsync(next);
}
}
// burn the socket
managerState = SocketManager.ManagerState.RecordConnectionFailed_ShutdownSocket;
Multiplexer.SocketManager?.Shutdown(socketToken);
}
public override string ToString()
{
return physicalName;
}
internal static void IdentifyFailureType(Exception exception, ref ConnectionFailureType failureType)
{
if (exception != null && failureType == ConnectionFailureType.InternalFailure)
{
if (exception is AggregateException) exception = exception.InnerException ?? exception;
if (exception is AuthenticationException) failureType = ConnectionFailureType.AuthenticationFailure;
else if (exception is EndOfStreamException) failureType = ConnectionFailureType.SocketClosed;
else if (exception is SocketException || exception is IOException) failureType = ConnectionFailureType.SocketFailure;
else if (exception is ObjectDisposedException) failureType = ConnectionFailureType.SocketClosed;
}
}
internal void Enqueue(Message next)
{
lock (outstanding)
{
outstanding.Enqueue(next);
}
}
internal void GetCounters(ConnectionCounters counters)
{
lock (outstanding)
{
counters.SentItemsAwaitingResponse = outstanding.Count;
}
counters.Subscriptions = SubscriptionCount;
}
internal Message GetReadModeCommand(bool isMasterOnly)
{
var serverEndpoint = Bridge.ServerEndPoint;
if (serverEndpoint.RequiresReadMode)
{
ReadMode requiredReadMode = isMasterOnly ? ReadMode.ReadWrite : ReadMode.ReadOnly;
if (requiredReadMode != currentReadMode)
{
currentReadMode = requiredReadMode;
switch (requiredReadMode)
{
case ReadMode.ReadOnly: return ReusableReadOnlyCommand;
case ReadMode.ReadWrite: return ReusableReadWriteCommand;
}
}
}
else if (currentReadMode == ReadMode.ReadOnly)
{ // we don't need it (because we're not a cluster, or not a slave),
// but we are in read-only mode; switch to read-write
currentReadMode = ReadMode.ReadWrite;
return ReusableReadWriteCommand;
}
return null;
}
internal Message GetSelectDatabaseCommand(int targetDatabase, Message message)
{
if (targetDatabase < 0) return null;
if (targetDatabase != currentDatabase)
{
var serverEndpoint = Bridge.ServerEndPoint;
int available = serverEndpoint.Databases;
if (!serverEndpoint.HasDatabases) // only db0 is available on cluster/twemproxy
{
if (targetDatabase != 0)
{ // should never see this, since the API doesn't allow it; thus not too worried about ExceptionFactory
throw new RedisCommandException("Multiple databases are not supported on this server; cannot switch to database: " + targetDatabase);
}
return null;
}
if (message.Command == RedisCommand.SELECT)
{
// this could come from an EVAL/EVALSHA inside a transaction, for example; we'll accept it
Bridge.Trace("Switching database: " + targetDatabase);
currentDatabase = targetDatabase;
return null;
}
if (TransactionActive)
{// should never see this, since the API doesn't allow it; thus not too worried about ExceptionFactory
throw new RedisCommandException("Multiple databases inside a transaction are not currently supported: " + targetDatabase);
}
if (available != 0 && targetDatabase >= available) // we positively know it is out of range
{
throw ExceptionFactory.DatabaseOutfRange(Multiplexer.IncludeDetailInExceptions, targetDatabase, message, serverEndpoint);
}
Bridge.Trace("Switching database: " + targetDatabase);
currentDatabase = targetDatabase;
return GetSelectDatabaseCommand(targetDatabase);
}
return null;
}
internal static Message GetSelectDatabaseCommand(int targetDatabase)
{
return targetDatabase < DefaultRedisDatabaseCount
? ReusableChangeDatabaseCommands[targetDatabase] // 0-15 by default
: Message.Create(targetDatabase, CommandFlags.FireAndForget, RedisCommand.SELECT);
}
internal int GetSentAwaitingResponseCount()
{
lock (outstanding)
{
return outstanding.Count;
}
}
internal void GetStormLog(StringBuilder sb)
{
lock (outstanding)
{
if (outstanding.Count == 0) return;
sb.Append("Sent, awaiting response from server: ").Append(outstanding.Count).AppendLine();
int total = 0;
foreach (var item in outstanding)
{
if (++total >= 500) break;
item.AppendStormLog(sb);
sb.AppendLine();
}
}
}
internal void OnHeartbeat()
{
Interlocked.Exchange(ref lastBeatTickCount, Environment.TickCount);
}
internal void OnInternalError(Exception exception, [CallerMemberName] string origin = null)
{
Multiplexer.OnInternalError(exception, Bridge.ServerEndPoint.EndPoint, connectionType, origin);
}
internal void SetUnknownDatabase()
{ // forces next db-specific command to issue a select
currentDatabase = -1;
}
internal void Write(RedisKey key)
{
var val = key.KeyValue;
if (val is string)
{
WriteUnified(outStream, key.KeyPrefix, (string)val);
}
else
{
WriteUnified(outStream, key.KeyPrefix, (byte[])val);
}
}
internal void Write(RedisChannel channel)
{
WriteUnified(outStream, ChannelPrefix, channel.Value);
}
internal void Write(RedisValue value)
{
if (value.IsInteger)
{
WriteUnified(outStream, (long)value);
}
else
{
WriteUnified(outStream, (byte[])value);
}
}
internal void WriteHeader(RedisCommand command, int arguments)
{
var commandBytes = Multiplexer.CommandMap.GetBytes(command);
if (commandBytes == null)
{
throw ExceptionFactory.CommandDisabled(Multiplexer.IncludeDetailInExceptions, command, null, Bridge.ServerEndPoint);
}
outStream.WriteByte((byte)'*');
// remember the time of the first write that still not followed by read
Interlocked.CompareExchange(ref firstUnansweredWriteTickCount, Environment.TickCount, 0);
WriteRaw(outStream, arguments + 1);
WriteUnified(outStream, commandBytes);
}
internal const int REDIS_MAX_ARGS = 1024 * 1024; // there is a <= 1024*1024 max constraint inside redis itself: https://github.com/antirez/redis/blob/6c60526db91e23fb2d666fc52facc9a11780a2a3/src/networking.c#L1024
internal void WriteHeader(string command, int arguments)
{
if (arguments >= REDIS_MAX_ARGS) // using >= here because we will be adding 1 for the command itself (which is an arg for the purposes of the multi-bulk protocol)
{
throw ExceptionFactory.TooManyArgs(Multiplexer.IncludeDetailInExceptions, command, null, Bridge.ServerEndPoint, arguments + 1);
}
var commandBytes = Multiplexer.CommandMap.GetBytes(command);
if (commandBytes == null)
{
throw ExceptionFactory.CommandDisabled(Multiplexer.IncludeDetailInExceptions, command, null, Bridge.ServerEndPoint);
}
outStream.WriteByte((byte)'*');
// remember the time of the first write that still not followed by read
Interlocked.CompareExchange(ref firstUnansweredWriteTickCount, Environment.TickCount, 0);
WriteRaw(outStream, arguments + 1);
WriteUnified(outStream, commandBytes);
}
private static void WriteRaw(Stream stream, long value, bool withLengthPrefix = false)
{
if (value >= 0 && value <= 9)
{
if (withLengthPrefix)
{
stream.WriteByte((byte)'1');
stream.Write(Crlf, 0, 2);
}
stream.WriteByte((byte)((int)'0' + (int)value));
}
else if (value >= 10 && value < 100)
{
if (withLengthPrefix)
{
stream.WriteByte((byte)'2');
stream.Write(Crlf, 0, 2);
}
stream.WriteByte((byte)((int)'0' + ((int)value / 10)));
stream.WriteByte((byte)((int)'0' + ((int)value % 10)));
}
else if (value >= 100 && value < 1000)
{
int v = (int)value;
int units = v % 10;
v /= 10;
int tens = v % 10, hundreds = v / 10;
if (withLengthPrefix)
{
stream.WriteByte((byte)'3');
stream.Write(Crlf, 0, 2);
}
stream.WriteByte((byte)((int)'0' + hundreds));
stream.WriteByte((byte)((int)'0' + tens));
stream.WriteByte((byte)((int)'0' + units));
}
else if (value < 0 && value >= -9)
{
if (withLengthPrefix)
{
stream.WriteByte((byte)'2');
stream.Write(Crlf, 0, 2);
}
stream.WriteByte((byte)'-');
stream.WriteByte((byte)((int)'0' - (int)value));
}
else if (value <= -10 && value > -100)
{
if (withLengthPrefix)
{
stream.WriteByte((byte)'3');
stream.Write(Crlf, 0, 2);
}
value = -value;
stream.WriteByte((byte)'-');
stream.WriteByte((byte)((int)'0' + ((int)value / 10)));
stream.WriteByte((byte)((int)'0' + ((int)value % 10)));
}
else
{
var bytes = Encoding.ASCII.GetBytes(Format.ToString(value));
if (withLengthPrefix)
{
WriteRaw(stream, bytes.Length, false);
}
stream.Write(bytes, 0, bytes.Length);
}
stream.Write(Crlf, 0, 2);
}
private static void WriteUnified(Stream stream, byte[] value)
{
stream.WriteByte((byte)'$');
if (value == null)
{
WriteRaw(stream, -1); // note that not many things like this...
}
else
{
WriteRaw(stream, value.Length);
stream.Write(value, 0, value.Length);
stream.Write(Crlf, 0, 2);
}
}
internal void WriteAsHex(byte[] value)
{
var stream = outStream;
stream.WriteByte((byte)'$');
if (value == null)
{
WriteRaw(stream, -1);
}
else
{
WriteRaw(stream, value.Length * 2);
for (int i = 0; i < value.Length; i++)
{
stream.WriteByte(ToHexNibble(value[i] >> 4));
stream.WriteByte(ToHexNibble(value[i] & 15));
}
stream.Write(Crlf, 0, 2);
}
}
internal static byte ToHexNibble(int value)
{
return value < 10 ? (byte)('0' + value) : (byte)('a' - 10 + value);
}
private void WriteUnified(Stream stream, byte[] prefix, string value)
{
stream.WriteByte((byte)'$');
if (value == null)
{
WriteRaw(stream, -1); // note that not many things like this...
}
else
{
int encodedLength = Encoding.UTF8.GetByteCount(value);
if (prefix == null)
{
WriteRaw(stream, encodedLength);
WriteRaw(stream, value, encodedLength);
stream.Write(Crlf, 0, 2);
}
else
{
WriteRaw(stream, prefix.Length + encodedLength);
stream.Write(prefix, 0, prefix.Length);
WriteRaw(stream, value, encodedLength);
stream.Write(Crlf, 0, 2);
}
}
}
private unsafe void WriteRaw(Stream stream, string value, int encodedLength)
{
if (encodedLength <= ScratchSize)
{
int bytes = Encoding.UTF8.GetBytes(value, 0, value.Length, outScratch, 0);
stream.Write(outScratch, 0, bytes);
}
else
{
#if NETSTANDARD1_5
int charsRemaining = value.Length, charOffset = 0, bytesWritten;
var valueCharArray = value.ToCharArray();
while (charsRemaining > Scratch_CharsPerBlock)
{
bytesWritten = outEncoder.GetBytes(valueCharArray, charOffset, Scratch_CharsPerBlock, outScratch, 0, false);
stream.Write(outScratch, 0, bytesWritten);
charOffset += Scratch_CharsPerBlock;
charsRemaining -= Scratch_CharsPerBlock;
}
bytesWritten = outEncoder.GetBytes(valueCharArray, charOffset, charsRemaining, outScratch, 0, true);
if (bytesWritten != 0) stream.Write(outScratch, 0, bytesWritten);
#else
fixed (char* c = value)
fixed (byte* b = outScratch)
{
int charsRemaining = value.Length, charOffset = 0, bytesWritten;
while (charsRemaining > Scratch_CharsPerBlock)
{
bytesWritten = outEncoder.GetBytes(c + charOffset, Scratch_CharsPerBlock, b, ScratchSize, false);
stream.Write(outScratch, 0, bytesWritten);
charOffset += Scratch_CharsPerBlock;
charsRemaining -= Scratch_CharsPerBlock;
}
bytesWritten = outEncoder.GetBytes(c + charOffset, charsRemaining, b, ScratchSize, true);
if (bytesWritten != 0) stream.Write(outScratch, 0, bytesWritten);
}
#endif
}
}
private const int ScratchSize = 512;
private static readonly int Scratch_CharsPerBlock = ScratchSize / Encoding.UTF8.GetMaxByteCount(1);
private readonly byte[] outScratch = new byte[ScratchSize];
private readonly Encoder outEncoder = Encoding.UTF8.GetEncoder();
private static void WriteUnified(Stream stream, byte[] prefix, byte[] value)
{
stream.WriteByte((byte)'$');
if (value == null)
{
WriteRaw(stream, -1); // note that not many things like this...
}
else if (prefix == null)
{
WriteRaw(stream, value.Length);
stream.Write(value, 0, value.Length);
stream.Write(Crlf, 0, 2);
}
else
{
WriteRaw(stream, prefix.Length + value.Length);
stream.Write(prefix, 0, prefix.Length);
stream.Write(value, 0, value.Length);
stream.Write(Crlf, 0, 2);
}
}
private static void WriteUnified(Stream stream, long value)
{
// note from specification: A client sends to the Redis server a RESP Array consisting of just Bulk Strings.
// (i.e. we can't just send ":123\r\n", we need to send "$3\r\n123\r\n"
stream.WriteByte((byte)'$');
WriteRaw(stream, value, withLengthPrefix: true);
}
private void BeginReading()
{
bool keepReading;
try
{
do
{
keepReading = false;
int space = EnsureSpaceAndComputeBytesToRead();
Multiplexer.Trace("Beginning async read...", physicalName);
#if NETSTANDARD1_5
var result = netStream.ReadAsync(ioBuffer, ioBufferBytes, space);
switch (result.Status)
{
case TaskStatus.RanToCompletion:
case TaskStatus.Faulted:
Multiplexer.Trace("Completed synchronously: processing immediately", physicalName);
keepReading = EndReading(result);
break;
default:
result.ContinueWith(endRead);
break;
}
#else
var result = netStream.BeginRead(ioBuffer, ioBufferBytes, space, endRead, this);
if (result.CompletedSynchronously)
{
Multiplexer.Trace("Completed synchronously: processing immediately", physicalName);
keepReading = EndReading(result);
}
#endif
} while (keepReading);
}
#if NETSTANDARD1_5
catch (AggregateException ex)
{
throw ex.InnerException;
}
#endif
catch (System.IO.IOException ex)
{
Multiplexer.Trace("Could not connect: " + ex.Message, physicalName);
}
}
private int haveReader;
internal int GetAvailableInboundBytes(out int activeReaders)
{
activeReaders = Interlocked.CompareExchange(ref haveReader, 0, 0);
return socketToken.Available;
}
private static LocalCertificateSelectionCallback GetAmbientCertificateCallback()
{
try
{
var pfxPath = Environment.GetEnvironmentVariable("SERedis_ClientCertPfxPath");
var pfxPassword = Environment.GetEnvironmentVariable("SERedis_ClientCertPassword");
var pfxStorageFlags = Environment.GetEnvironmentVariable("SERedis_ClientCertStorageFlags");
X509KeyStorageFlags? flags = null;
if (!string.IsNullOrEmpty(pfxStorageFlags))
{
flags = Enum.Parse(typeof(X509KeyStorageFlags), pfxStorageFlags) as X509KeyStorageFlags?;
}
if (!string.IsNullOrEmpty(pfxPath) && File.Exists(pfxPath))
{
return delegate { return new X509Certificate2(pfxPath, pfxPassword ?? "", flags ?? X509KeyStorageFlags.DefaultKeySet); };
}
}
catch
{ }
return null;
}
SocketMode ISocketCallback.Connected(Stream stream, TextWriter log)
{
try
{
var socketMode = SocketManager.DefaultSocketMode;
// disallow connection in some cases
OnDebugAbort();
// the order is important here:
// [network]<==[ssl]<==[logging]<==[buffered]
var config = Multiplexer.RawConfig;
if (config.Ssl)
{
Multiplexer.LogLocked(log, "Configuring SSL");
var host = config.SslHost;
if (string.IsNullOrWhiteSpace(host)) host = Format.ToStringHostOnly(Bridge.ServerEndPoint.EndPoint);
var ssl = new SslStream(stream, false, config.CertificateValidationCallback,
config.CertificateSelectionCallback ?? GetAmbientCertificateCallback()
#if !__MonoCS__
, EncryptionPolicy.RequireEncryption
#endif
);
try
{
ssl.AuthenticateAsClient(host, config.SslProtocols);
Multiplexer.LogLocked(log, $"SSL connection established successfully using protocol: {ssl.SslProtocol}");
}
catch (AuthenticationException authexception)
{
RecordConnectionFailed(ConnectionFailureType.AuthenticationFailure, authexception);
Multiplexer.Trace("Encryption failure");
return SocketMode.Abort;
}
stream = ssl;
socketMode = SocketMode.Async;
}
OnWrapForLogging(ref stream, physicalName);
int bufferSize = config.WriteBuffer;
netStream = stream;
outStream = bufferSize <= 0 ? stream : new BufferedStream(stream, bufferSize);
Multiplexer.LogLocked(log, "Connected {0}", Bridge);
Bridge.OnConnected(this, log);
return socketMode;
}
catch (Exception ex)
{
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex); // includes a bridge.OnDisconnected
Multiplexer.Trace("Could not connect: " + ex.Message, physicalName);
return SocketMode.Abort;
}
}
#if NETSTANDARD1_5
private bool EndReading(Task<int> result)
{
try
{
var tmp = netStream;
int bytesRead = tmp == null ? 0 : result.Result; // note we expect this to be completed
return ProcessReadBytes(bytesRead);
}
catch (Exception ex)
{
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
return false;
}
}
#else
private bool EndReading(IAsyncResult result)
{
try
{
int bytesRead = netStream?.EndRead(result) ?? 0;
return ProcessReadBytes(bytesRead);
}
catch (Exception ex)
{
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
return false;
}
}
#endif
private int EnsureSpaceAndComputeBytesToRead()
{
int space = ioBuffer.Length - ioBufferBytes;
if (space == 0)
{
Array.Resize(ref ioBuffer, ioBuffer.Length * 2);
space = ioBuffer.Length - ioBufferBytes;
}
return space;
}
void ISocketCallback.Error()
{
RecordConnectionFailed(ConnectionFailureType.SocketFailure);
}
private void MatchResult(RawResult result)
{
// check to see if it could be an out-of-band pubsub message
if (connectionType == ConnectionType.Subscription && result.Type == ResultType.MultiBulk)
{ // out of band message does not match to a queued message
var items = result.GetItems();
if (items.Length >= 3 && items[0].IsEqual(message))
{
// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry)
var configChanged = Multiplexer.ConfigurationChangedChannel;
if (configChanged != null && items[1].IsEqual(configChanged))
{
EndPoint blame = null;
try
{
if (!items[2].IsEqual(RedisLiterals.ByteWildcard))
{
blame = Format.TryParseEndPoint(items[2].GetString());
}
}
catch { /* no biggie */ }
Multiplexer.Trace("Configuration changed: " + Format.ToString(blame), physicalName);
Multiplexer.ReconfigureIfNeeded(blame, true, "broadcast");
}
// invoke the handlers
var channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
Multiplexer.Trace("MESSAGE: " + channel, physicalName);
if (!channel.IsNull)
{
Multiplexer.OnMessage(channel, channel, items[2].AsRedisValue());
}
return; // AND STOP PROCESSING!
}
else if (items.Length >= 4 && items[0].IsEqual(pmessage))
{
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
Multiplexer.Trace("PMESSAGE: " + channel, physicalName);
if (!channel.IsNull)
{
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
Multiplexer.OnMessage(sub, channel, items[3].AsRedisValue());
}
return; // AND STOP PROCESSING!
}
// if it didn't look like "[p]message", then we still need to process the pending queue
}
Multiplexer.Trace("Matching result...", physicalName);
Message msg;
lock (outstanding)
{
Multiplexer.Trace(outstanding.Count == 0, "Nothing to respond to!", physicalName);
msg = outstanding.Dequeue();
}
Multiplexer.Trace("Response to: " + msg.ToString(), physicalName);
if (msg.ComputeResult(this, result))
{
Bridge.CompleteSyncOrAsync(msg);
}
}
partial void OnCloseEcho();
partial void OnCreateEcho();
partial void OnDebugAbort();
void ISocketCallback.OnHeartbeat()
{
try
{
Bridge.OnHeartbeat(true); // all the fun code is here
}
catch (Exception ex)
{
OnInternalError(ex);
}
}
partial void OnWrapForLogging(ref Stream stream, string name);
private int ProcessBuffer(byte[] underlying, ref int offset, ref int count)
{
int messageCount = 0;
RawResult result;
do
{
int tmpOffset = offset, tmpCount = count;
// we want TryParseResult to be able to mess with these without consequence
result = TryParseResult(underlying, ref tmpOffset, ref tmpCount);
if (result.HasValue)
{
messageCount++;
// entire message: update the external counters
offset = tmpOffset;
count = tmpCount;
Multiplexer.Trace(result.ToString(), physicalName);
MatchResult(result);
}
} while (result.HasValue);
return messageCount;
}
private bool ProcessReadBytes(int bytesRead)
{
if (bytesRead <= 0)
{
Multiplexer.Trace("EOF", physicalName);
RecordConnectionFailed(ConnectionFailureType.SocketClosed);
return false;
}
Interlocked.Exchange(ref lastReadTickCount, Environment.TickCount);
// reset unanswered write timestamp
VolatileWrapper.Write(ref firstUnansweredWriteTickCount, 0);
ioBufferBytes += bytesRead;
Multiplexer.Trace("More bytes available: " + bytesRead + " (" + ioBufferBytes + ")", physicalName);
int offset = 0, count = ioBufferBytes;
int handled = ProcessBuffer(ioBuffer, ref offset, ref count);
Multiplexer.Trace("Processed: " + handled, physicalName);
if (handled != 0)
{
// read stuff
if (count != 0)
{
Multiplexer.Trace("Copying remaining bytes: " + count, physicalName);
// if anything was left over, we need to copy it to
// the start of the buffer so it can be used next time
Buffer.BlockCopy(ioBuffer, offset, ioBuffer, 0, count);
}
ioBufferBytes = count;
}
return true;
}
void ISocketCallback.Read()
{
Interlocked.Increment(ref haveReader);
try
{
do
{
int space = EnsureSpaceAndComputeBytesToRead();
int bytesRead = netStream?.Read(ioBuffer, ioBufferBytes, space) ?? 0;
if (!ProcessReadBytes(bytesRead)) return; // EOF
} while (socketToken.Available != 0);
Multiplexer.Trace("Buffer exhausted", physicalName);
// ^^^ note that the socket manager will call us again when there is something to do
}
catch (Exception ex)
{
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
}
finally
{
Interlocked.Decrement(ref haveReader);
}
}
bool ISocketCallback.IsDataAvailable
{
get
{
try { return socketToken.Available > 0; }
catch { return false; }
}
}
private RawResult ReadArray(byte[] buffer, ref int offset, ref int count)
{
var itemCount = ReadLineTerminatedString(ResultType.Integer, buffer, ref offset, ref count);
if (itemCount.HasValue)
{
if (!itemCount.TryGetInt64(out long i64)) throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid array length", Bridge.ServerEndPoint);
int itemCountActual = checked((int)i64);
if (itemCountActual < 0)
{
//for null response by command like EXEC, RESP array: *-1\r\n
return new RawResult(ResultType.SimpleString, null, 0, 0);
}
else if (itemCountActual == 0)
{
//for zero array response by command like SCAN, Resp array: *0\r\n
return RawResult.EmptyArray;
}
var arr = new RawResult[itemCountActual];
for (int i = 0; i < itemCountActual; i++)
{
if (!(arr[i] = TryParseResult(buffer, ref offset, ref count)).HasValue)
return RawResult.Nil;
}
return new RawResult(arr);
}
return RawResult.Nil;
}
private RawResult ReadBulkString(byte[] buffer, ref int offset, ref int count)
{
var prefix = ReadLineTerminatedString(ResultType.Integer, buffer, ref offset, ref count);
if (prefix.HasValue)
{
if (!prefix.TryGetInt64(out long i64)) throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid bulk string length", Bridge.ServerEndPoint);
int bodySize = checked((int)i64);
if (bodySize < 0)
{
return new RawResult(ResultType.BulkString, null, 0, 0);
}
else if (count >= bodySize + 2)
{
if (buffer[offset + bodySize] != '\r' || buffer[offset + bodySize + 1] != '\n')
{
throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid bulk string terminator", Bridge.ServerEndPoint);
}
var result = new RawResult(ResultType.BulkString, buffer, offset, bodySize);
offset += bodySize + 2;
count -= bodySize + 2;
return result;
}
}
return RawResult.Nil;
}
private RawResult ReadLineTerminatedString(ResultType type, byte[] buffer, ref int offset, ref int count)
{
int max = offset + count - 2;
for (int i = offset; i < max; i++)
{
if (buffer[i + 1] == '\r' && buffer[i + 2] == '\n')
{
int len = i - offset + 1;
var result = new RawResult(type, buffer, offset, len);
count -= (len + 2);
offset += (len + 2);
return result;
}
}
return RawResult.Nil;
}
void ISocketCallback.StartReading()
{
BeginReading();
}
private RawResult TryParseResult(byte[] buffer, ref int offset, ref int count)
{
if (count == 0) return RawResult.Nil;
char resultType = (char)buffer[offset++];
count--;
switch (resultType)
{
case '+': // simple string
return ReadLineTerminatedString(ResultType.SimpleString, buffer, ref offset, ref count);
case '-': // error
return ReadLineTerminatedString(ResultType.Error, buffer, ref offset, ref count);
case ':': // integer
return ReadLineTerminatedString(ResultType.Integer, buffer, ref offset, ref count);
case '$': // bulk string
return ReadBulkString(buffer, ref offset, ref count);
case '*': // array
return ReadArray(buffer, ref offset, ref count);
default:
throw new InvalidOperationException("Unexpected response prefix: " + (char)resultType);
}
}
partial void DebugEmulateStaleConnection(ref int firstUnansweredWrite);
public void CheckForStaleConnection(ref SocketManager.ManagerState state)
{
int firstUnansweredWrite = VolatileWrapper.Read(ref firstUnansweredWriteTickCount);
DebugEmulateStaleConnection(ref firstUnansweredWrite);
int now = Environment.TickCount;
if (firstUnansweredWrite != 0 && (now - firstUnansweredWrite) > Multiplexer.RawConfig.ResponseTimeout)
{
RecordConnectionFailed(ConnectionFailureType.SocketFailure, ref state, origin: "CheckForStaleConnection");
}
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
#if NETSTANDARD1_5
using System.Threading.Tasks;
#endif
namespace StackExchange.Redis
{
internal sealed partial class PhysicalConnection : IDisposable, ISocketCallback
{
internal readonly byte[] ChannelPrefix;
private const int DefaultRedisDatabaseCount = 16;
private static readonly byte[] Crlf = Encoding.ASCII.GetBytes("\r\n");
#if NETSTANDARD1_5
private readonly Action<Task<int>> endRead;
private static Action<Task<int>> EndReadFactory(PhysicalConnection physical)
{
return result =>
{ // can't capture AsyncState on SocketRead, so we'll do it once per physical instead
if (result.IsFaulted)
{
GC.KeepAlive(result.Exception);
}
try
{
physical.Multiplexer.Trace("Completed asynchronously: processing in callback", physical.physicalName);
if (physical.EndReading(result)) physical.BeginReading();
}
catch (Exception ex)
{
physical.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
}
};
}
#else
private static readonly AsyncCallback endRead = result =>
{
PhysicalConnection physical;
if (result.CompletedSynchronously || (physical = result.AsyncState as PhysicalConnection) == null) return;
try
{
physical.Multiplexer.Trace("Completed asynchronously: processing in callback", physical.physicalName);
if (physical.EndReading(result)) physical.BeginReading();
}
catch (Exception ex)
{
physical.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
}
};
#endif
private static readonly byte[] message = Encoding.UTF8.GetBytes("message"), pmessage = Encoding.UTF8.GetBytes("pmessage");
private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select(
i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray();
private static readonly Message
ReusableReadOnlyCommand = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.READONLY),
ReusableReadWriteCommand = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.READWRITE);
private static int totalCount;
private readonly ConnectionType connectionType;
// things sent to this physical, but not yet received
private readonly Queue<Message> outstanding = new Queue<Message>();
private readonly string physicalName;
private volatile int currentDatabase = 0;
private ReadMode currentReadMode = ReadMode.NotSpecified;
private int failureReported;
private byte[] ioBuffer = new byte[512];
private int ioBufferBytes = 0;
private int lastWriteTickCount, lastReadTickCount, lastBeatTickCount;
private int firstUnansweredWriteTickCount;
private Stream netStream, outStream;
private SocketToken socketToken;
public PhysicalConnection(PhysicalBridge bridge)
{
lastWriteTickCount = lastReadTickCount = Environment.TickCount;
lastBeatTickCount = 0;
connectionType = bridge.ConnectionType;
Multiplexer = bridge.Multiplexer;
ChannelPrefix = Multiplexer.RawConfig.ChannelPrefix;
if (ChannelPrefix?.Length == 0) ChannelPrefix = null; // null tests are easier than null+empty
var endpoint = bridge.ServerEndPoint.EndPoint;
physicalName = connectionType + "#" + Interlocked.Increment(ref totalCount) + "@" + Format.ToString(endpoint);
Bridge = bridge;
#if NETSTANDARD1_5
endRead = EndReadFactory(this);
#endif
OnCreateEcho();
}
public void BeginConnect(TextWriter log)
{
VolatileWrapper.Write(ref firstUnansweredWriteTickCount, 0);
var endpoint = Bridge.ServerEndPoint.EndPoint;
Multiplexer.Trace("Connecting...", physicalName);
socketToken = Multiplexer.SocketManager.BeginConnect(endpoint, this, Multiplexer, log);
}
private enum ReadMode : byte
{
NotSpecified,
ReadOnly,
ReadWrite
}
public PhysicalBridge Bridge { get; }
public long LastWriteSecondsAgo => unchecked(Environment.TickCount - VolatileWrapper.Read(ref lastWriteTickCount)) / 1000;
public ConnectionMultiplexer Multiplexer { get; }
public long SubscriptionCount { get; set; }
public bool TransactionActive { get; internal set; }
public void Dispose()
{
if (outStream != null)
{
Multiplexer.Trace("Disconnecting...", physicalName);
#if !NETSTANDARD1_5
try { outStream.Close(); } catch { }
#endif
try { outStream.Dispose(); } catch { }
outStream = null;
}
if (netStream != null)
{
#if !NETSTANDARD1_5
try { netStream.Close(); } catch { }
#endif
try { netStream.Dispose(); } catch { }
netStream = null;
}
if (socketToken.HasValue)
{
Multiplexer.SocketManager?.Shutdown(socketToken);
socketToken = default(SocketToken);
Multiplexer.Trace("Disconnected", physicalName);
RecordConnectionFailed(ConnectionFailureType.ConnectionDisposed);
}
OnCloseEcho();
}
public void Flush()
{
var tmp = outStream;
if (tmp != null)
{
tmp.Flush();
Interlocked.Exchange(ref lastWriteTickCount, Environment.TickCount);
}
}
public void RecordConnectionFailed(ConnectionFailureType failureType, Exception innerException = null, [CallerMemberName] string origin = null)
{
var mgrState = SocketManager.ManagerState.CheckForStaleConnections;
RecordConnectionFailed(failureType, ref mgrState, innerException, origin);
}
public void RecordConnectionFailed(ConnectionFailureType failureType, ref SocketManager.ManagerState managerState, Exception innerException = null, [CallerMemberName] string origin = null)
{
IdentifyFailureType(innerException, ref failureType);
managerState = SocketManager.ManagerState.RecordConnectionFailed_OnInternalError;
if (failureType == ConnectionFailureType.InternalFailure) OnInternalError(innerException, origin);
// stop anything new coming in...
Bridge.Trace("Failed: " + failureType);
int @in = -1, ar = -1;
managerState = SocketManager.ManagerState.RecordConnectionFailed_OnDisconnected;
Bridge.OnDisconnected(failureType, this, out bool isCurrent, out PhysicalBridge.State oldState);
if (oldState == PhysicalBridge.State.ConnectedEstablished)
{
try
{
@in = GetAvailableInboundBytes(out ar);
}
catch { /* best effort only */ }
}
if (isCurrent && Interlocked.CompareExchange(ref failureReported, 1, 0) == 0)
{
managerState = SocketManager.ManagerState.RecordConnectionFailed_ReportFailure;
int now = Environment.TickCount, lastRead = VolatileWrapper.Read(ref lastReadTickCount), lastWrite = VolatileWrapper.Read(ref lastWriteTickCount),
lastBeat = VolatileWrapper.Read(ref lastBeatTickCount);
int unansweredRead = VolatileWrapper.Read(ref firstUnansweredWriteTickCount);
var exMessage = new StringBuilder(failureType + " on " + Format.ToString(Bridge.ServerEndPoint.EndPoint) + "/" + connectionType);
var data = new List<Tuple<string, string>>
{
Tuple.Create("FailureType", failureType.ToString()),
Tuple.Create("EndPoint", Format.ToString(Bridge.ServerEndPoint.EndPoint))
};
void add(string lk, string sk, string v)
{
data.Add(Tuple.Create(lk, v));
exMessage.Append(", ").Append(sk).Append(": ").Append(v);
}
add("Origin", "origin", origin);
add("Input-Buffer", "input-buffer", ioBufferBytes.ToString());
add("Outstanding-Responses", "outstanding", GetSentAwaitingResponseCount().ToString());
add("Last-Read", "last-read", (unchecked(now - lastRead) / 1000) + "s ago");
add("Last-Write", "last-write", (unchecked(now - lastWrite) / 1000) + "s ago");
add("Unanswered-Write", "unanswered-write", (unchecked(now - unansweredRead) / 1000) + "s ago");
add("Keep-Alive", "keep-alive", Bridge.ServerEndPoint.WriteEverySeconds + "s");
add("Pending", "pending", Bridge.GetPendingCount().ToString());
add("Previous-Physical-State", "state", oldState.ToString());
if (@in >= 0)
{
add("Inbound-Bytes", "in", @in.ToString());
add("Active-Readers", "ar", ar.ToString());
}
add("Last-Heartbeat", "last-heartbeat", (lastBeat == 0 ? "never" : ((unchecked(now - lastBeat) / 1000) + "s ago")) + (Bridge.IsBeating ? " (mid-beat)" : ""));
add("Last-Multiplexer-Heartbeat", "last-mbeat", Multiplexer.LastHeartbeatSecondsAgo + "s ago");
add("Last-Global-Heartbeat", "global", ConnectionMultiplexer.LastGlobalHeartbeatSecondsAgo + "s ago");
#if FEATURE_SOCKET_MODE_POLL
var mgr = Bridge.Multiplexer.SocketManager;
add("SocketManager-State", "mgr", mgr.State.ToString());
add("Last-Error", "err", mgr.LastErrorTimeRelative());
#endif
var ex = innerException == null
? new RedisConnectionException(failureType, exMessage.ToString())
: new RedisConnectionException(failureType, exMessage.ToString(), innerException);
foreach (var kv in data)
{
ex.Data["Redis-" + kv.Item1] = kv.Item2;
}
managerState = SocketManager.ManagerState.RecordConnectionFailed_OnConnectionFailed;
Bridge.OnConnectionFailed(this, failureType, ex);
}
// cleanup
managerState = SocketManager.ManagerState.RecordConnectionFailed_FailOutstanding;
lock (outstanding)
{
Bridge.Trace(outstanding.Count != 0, "Failing outstanding messages: " + outstanding.Count);
while (outstanding.Count != 0)
{
var next = outstanding.Dequeue();
Bridge.Trace("Failing: " + next);
next.Fail(failureType, innerException);
Bridge.CompleteSyncOrAsync(next);
}
}
// burn the socket
managerState = SocketManager.ManagerState.RecordConnectionFailed_ShutdownSocket;
Multiplexer.SocketManager?.Shutdown(socketToken);
}
public override string ToString()
{
return physicalName;
}
internal static void IdentifyFailureType(Exception exception, ref ConnectionFailureType failureType)
{
if (exception != null && failureType == ConnectionFailureType.InternalFailure)
{
if (exception is AggregateException) exception = exception.InnerException ?? exception;
if (exception is AuthenticationException) failureType = ConnectionFailureType.AuthenticationFailure;
else if (exception is EndOfStreamException) failureType = ConnectionFailureType.SocketClosed;
else if (exception is SocketException || exception is IOException) failureType = ConnectionFailureType.SocketFailure;
else if (exception is ObjectDisposedException) failureType = ConnectionFailureType.SocketClosed;
}
}
internal void Enqueue(Message next)
{
lock (outstanding)
{
outstanding.Enqueue(next);
}
}
internal void GetCounters(ConnectionCounters counters)
{
lock (outstanding)
{
counters.SentItemsAwaitingResponse = outstanding.Count;
}
counters.Subscriptions = SubscriptionCount;
}
internal Message GetReadModeCommand(bool isMasterOnly)
{
var serverEndpoint = Bridge.ServerEndPoint;
if (serverEndpoint.RequiresReadMode)
{
ReadMode requiredReadMode = isMasterOnly ? ReadMode.ReadWrite : ReadMode.ReadOnly;
if (requiredReadMode != currentReadMode)
{
currentReadMode = requiredReadMode;
switch (requiredReadMode)
{
case ReadMode.ReadOnly: return ReusableReadOnlyCommand;
case ReadMode.ReadWrite: return ReusableReadWriteCommand;
}
}
}
else if (currentReadMode == ReadMode.ReadOnly)
{ // we don't need it (because we're not a cluster, or not a slave),
// but we are in read-only mode; switch to read-write
currentReadMode = ReadMode.ReadWrite;
return ReusableReadWriteCommand;
}
return null;
}
internal Message GetSelectDatabaseCommand(int targetDatabase, Message message)
{
if (targetDatabase < 0) return null;
if (targetDatabase != currentDatabase)
{
var serverEndpoint = Bridge.ServerEndPoint;
int available = serverEndpoint.Databases;
if (!serverEndpoint.HasDatabases) // only db0 is available on cluster/twemproxy
{
if (targetDatabase != 0)
{ // should never see this, since the API doesn't allow it; thus not too worried about ExceptionFactory
throw new RedisCommandException("Multiple databases are not supported on this server; cannot switch to database: " + targetDatabase);
}
return null;
}
if (message.Command == RedisCommand.SELECT)
{
// this could come from an EVAL/EVALSHA inside a transaction, for example; we'll accept it
Bridge.Trace("Switching database: " + targetDatabase);
currentDatabase = targetDatabase;
return null;
}
if (TransactionActive)
{// should never see this, since the API doesn't allow it; thus not too worried about ExceptionFactory
throw new RedisCommandException("Multiple databases inside a transaction are not currently supported: " + targetDatabase);
}
if (available != 0 && targetDatabase >= available) // we positively know it is out of range
{
throw ExceptionFactory.DatabaseOutfRange(Multiplexer.IncludeDetailInExceptions, targetDatabase, message, serverEndpoint);
}
Bridge.Trace("Switching database: " + targetDatabase);
currentDatabase = targetDatabase;
return GetSelectDatabaseCommand(targetDatabase);
}
return null;
}
internal static Message GetSelectDatabaseCommand(int targetDatabase)
{
return targetDatabase < DefaultRedisDatabaseCount
? ReusableChangeDatabaseCommands[targetDatabase] // 0-15 by default
: Message.Create(targetDatabase, CommandFlags.FireAndForget, RedisCommand.SELECT);
}
internal int GetSentAwaitingResponseCount()
{
lock (outstanding)
{
return outstanding.Count;
}
}
internal void GetStormLog(StringBuilder sb)
{
lock (outstanding)
{
if (outstanding.Count == 0) return;
sb.Append("Sent, awaiting response from server: ").Append(outstanding.Count).AppendLine();
int total = 0;
foreach (var item in outstanding)
{
if (++total >= 500) break;
item.AppendStormLog(sb);
sb.AppendLine();
}
}
}
internal void OnHeartbeat()
{
Interlocked.Exchange(ref lastBeatTickCount, Environment.TickCount);
}
internal void OnInternalError(Exception exception, [CallerMemberName] string origin = null)
{
Multiplexer.OnInternalError(exception, Bridge.ServerEndPoint.EndPoint, connectionType, origin);
}
internal void SetUnknownDatabase()
{ // forces next db-specific command to issue a select
currentDatabase = -1;
}
internal void Write(RedisKey key)
{
var val = key.KeyValue;
if (val is string)
{
WriteUnified(outStream, key.KeyPrefix, (string)val);
}
else
{
WriteUnified(outStream, key.KeyPrefix, (byte[])val);
}
}
internal void Write(RedisChannel channel)
{
WriteUnified(outStream, ChannelPrefix, channel.Value);
}
internal void Write(RedisValue value)
{
if (value.IsInteger)
{
WriteUnified(outStream, (long)value);
}
else
{
WriteUnified(outStream, (byte[])value);
}
}
internal void WriteHeader(RedisCommand command, int arguments)
{
var commandBytes = Multiplexer.CommandMap.GetBytes(command);
if (commandBytes == null)
{
throw ExceptionFactory.CommandDisabled(Multiplexer.IncludeDetailInExceptions, command, null, Bridge.ServerEndPoint);
}
outStream.WriteByte((byte)'*');
// remember the time of the first write that still not followed by read
Interlocked.CompareExchange(ref firstUnansweredWriteTickCount, Environment.TickCount, 0);
WriteRaw(outStream, arguments + 1);
WriteUnified(outStream, commandBytes);
}
internal const int REDIS_MAX_ARGS = 1024 * 1024; // there is a <= 1024*1024 max constraint inside redis itself: https://github.com/antirez/redis/blob/6c60526db91e23fb2d666fc52facc9a11780a2a3/src/networking.c#L1024
internal void WriteHeader(string command, int arguments)
{
if (arguments >= REDIS_MAX_ARGS) // using >= here because we will be adding 1 for the command itself (which is an arg for the purposes of the multi-bulk protocol)
{
throw ExceptionFactory.TooManyArgs(Multiplexer.IncludeDetailInExceptions, command, null, Bridge.ServerEndPoint, arguments + 1);
}
var commandBytes = Multiplexer.CommandMap.GetBytes(command);
if (commandBytes == null)
{
throw ExceptionFactory.CommandDisabled(Multiplexer.IncludeDetailInExceptions, command, null, Bridge.ServerEndPoint);
}
outStream.WriteByte((byte)'*');
// remember the time of the first write that still not followed by read
Interlocked.CompareExchange(ref firstUnansweredWriteTickCount, Environment.TickCount, 0);
WriteRaw(outStream, arguments + 1);
WriteUnified(outStream, commandBytes);
}
private static void WriteRaw(Stream stream, long value, bool withLengthPrefix = false)
{
if (value >= 0 && value <= 9)
{
if (withLengthPrefix)
{
stream.WriteByte((byte)'1');
stream.Write(Crlf, 0, 2);
}
stream.WriteByte((byte)((int)'0' + (int)value));
}
else if (value >= 10 && value < 100)
{
if (withLengthPrefix)
{
stream.WriteByte((byte)'2');
stream.Write(Crlf, 0, 2);
}
stream.WriteByte((byte)((int)'0' + ((int)value / 10)));
stream.WriteByte((byte)((int)'0' + ((int)value % 10)));
}
else if (value >= 100 && value < 1000)
{
int v = (int)value;
int units = v % 10;
v /= 10;
int tens = v % 10, hundreds = v / 10;
if (withLengthPrefix)
{
stream.WriteByte((byte)'3');
stream.Write(Crlf, 0, 2);
}
stream.WriteByte((byte)((int)'0' + hundreds));
stream.WriteByte((byte)((int)'0' + tens));
stream.WriteByte((byte)((int)'0' + units));
}
else if (value < 0 && value >= -9)
{
if (withLengthPrefix)
{
stream.WriteByte((byte)'2');
stream.Write(Crlf, 0, 2);
}
stream.WriteByte((byte)'-');
stream.WriteByte((byte)((int)'0' - (int)value));
}
else if (value <= -10 && value > -100)
{
if (withLengthPrefix)
{
stream.WriteByte((byte)'3');
stream.Write(Crlf, 0, 2);
}
value = -value;
stream.WriteByte((byte)'-');
stream.WriteByte((byte)((int)'0' + ((int)value / 10)));
stream.WriteByte((byte)((int)'0' + ((int)value % 10)));
}
else
{
var bytes = Encoding.ASCII.GetBytes(Format.ToString(value));
if (withLengthPrefix)
{
WriteRaw(stream, bytes.Length, false);
}
stream.Write(bytes, 0, bytes.Length);
}
stream.Write(Crlf, 0, 2);
}
private static void WriteUnified(Stream stream, byte[] value)
{
stream.WriteByte((byte)'$');
if (value == null)
{
WriteRaw(stream, -1); // note that not many things like this...
}
else
{
WriteRaw(stream, value.Length);
stream.Write(value, 0, value.Length);
stream.Write(Crlf, 0, 2);
}
}
internal void WriteAsHex(byte[] value)
{
var stream = outStream;
stream.WriteByte((byte)'$');
if (value == null)
{
WriteRaw(stream, -1);
}
else
{
WriteRaw(stream, value.Length * 2);
for (int i = 0; i < value.Length; i++)
{
stream.WriteByte(ToHexNibble(value[i] >> 4));
stream.WriteByte(ToHexNibble(value[i] & 15));
}
stream.Write(Crlf, 0, 2);
}
}
internal static byte ToHexNibble(int value)
{
return value < 10 ? (byte)('0' + value) : (byte)('a' - 10 + value);
}
private void WriteUnified(Stream stream, byte[] prefix, string value)
{
stream.WriteByte((byte)'$');
if (value == null)
{
WriteRaw(stream, -1); // note that not many things like this...
}
else
{
int encodedLength = Encoding.UTF8.GetByteCount(value);
if (prefix == null)
{
WriteRaw(stream, encodedLength);
WriteRaw(stream, value, encodedLength);
stream.Write(Crlf, 0, 2);
}
else
{
WriteRaw(stream, prefix.Length + encodedLength);
stream.Write(prefix, 0, prefix.Length);
WriteRaw(stream, value, encodedLength);
stream.Write(Crlf, 0, 2);
}
}
}
private unsafe void WriteRaw(Stream stream, string value, int encodedLength)
{
if (encodedLength <= ScratchSize)
{
int bytes = Encoding.UTF8.GetBytes(value, 0, value.Length, outScratch, 0);
stream.Write(outScratch, 0, bytes);
}
else
{
#if NETSTANDARD1_5
int charsRemaining = value.Length, charOffset = 0, bytesWritten;
var valueCharArray = value.ToCharArray();
while (charsRemaining > Scratch_CharsPerBlock)
{
bytesWritten = outEncoder.GetBytes(valueCharArray, charOffset, Scratch_CharsPerBlock, outScratch, 0, false);
stream.Write(outScratch, 0, bytesWritten);
charOffset += Scratch_CharsPerBlock;
charsRemaining -= Scratch_CharsPerBlock;
}
bytesWritten = outEncoder.GetBytes(valueCharArray, charOffset, charsRemaining, outScratch, 0, true);
if (bytesWritten != 0) stream.Write(outScratch, 0, bytesWritten);
#else
fixed (char* c = value)
fixed (byte* b = outScratch)
{
int charsRemaining = value.Length, charOffset = 0, bytesWritten;
while (charsRemaining > Scratch_CharsPerBlock)
{
bytesWritten = outEncoder.GetBytes(c + charOffset, Scratch_CharsPerBlock, b, ScratchSize, false);
stream.Write(outScratch, 0, bytesWritten);
charOffset += Scratch_CharsPerBlock;
charsRemaining -= Scratch_CharsPerBlock;
}
bytesWritten = outEncoder.GetBytes(c + charOffset, charsRemaining, b, ScratchSize, true);
if (bytesWritten != 0) stream.Write(outScratch, 0, bytesWritten);
}
#endif
}
}
private const int ScratchSize = 512;
private static readonly int Scratch_CharsPerBlock = ScratchSize / Encoding.UTF8.GetMaxByteCount(1);
private readonly byte[] outScratch = new byte[ScratchSize];
private readonly Encoder outEncoder = Encoding.UTF8.GetEncoder();
private static void WriteUnified(Stream stream, byte[] prefix, byte[] value)
{
stream.WriteByte((byte)'$');
if (value == null)
{
WriteRaw(stream, -1); // note that not many things like this...
}
else if (prefix == null)
{
WriteRaw(stream, value.Length);
stream.Write(value, 0, value.Length);
stream.Write(Crlf, 0, 2);
}
else
{
WriteRaw(stream, prefix.Length + value.Length);
stream.Write(prefix, 0, prefix.Length);
stream.Write(value, 0, value.Length);
stream.Write(Crlf, 0, 2);
}
}
private static void WriteUnified(Stream stream, long value)
{
// note from specification: A client sends to the Redis server a RESP Array consisting of just Bulk Strings.
// (i.e. we can't just send ":123\r\n", we need to send "$3\r\n123\r\n"
stream.WriteByte((byte)'$');
WriteRaw(stream, value, withLengthPrefix: true);
}
private void BeginReading()
{
bool keepReading;
try
{
do
{
keepReading = false;
int space = EnsureSpaceAndComputeBytesToRead();
Multiplexer.Trace("Beginning async read...", physicalName);
#if NETSTANDARD1_5
var result = netStream.ReadAsync(ioBuffer, ioBufferBytes, space);
switch (result.Status)
{
case TaskStatus.RanToCompletion:
case TaskStatus.Faulted:
Multiplexer.Trace("Completed synchronously: processing immediately", physicalName);
keepReading = EndReading(result);
break;
default:
result.ContinueWith(endRead);
break;
}
#else
var result = netStream.BeginRead(ioBuffer, ioBufferBytes, space, endRead, this);
if (result.CompletedSynchronously)
{
Multiplexer.Trace("Completed synchronously: processing immediately", physicalName);
keepReading = EndReading(result);
}
#endif
} while (keepReading);
}
#if NETSTANDARD1_5
catch (AggregateException ex)
{
throw ex.InnerException;
}
#endif
catch (System.IO.IOException ex)
{
Multiplexer.Trace("Could not connect: " + ex.Message, physicalName);
}
}
private int haveReader;
internal int GetAvailableInboundBytes(out int activeReaders)
{
activeReaders = Interlocked.CompareExchange(ref haveReader, 0, 0);
return socketToken.Available;
}
private static LocalCertificateSelectionCallback GetAmbientCertificateCallback()
{
try
{
var pfxPath = Environment.GetEnvironmentVariable("SERedis_ClientCertPfxPath");
var pfxPassword = Environment.GetEnvironmentVariable("SERedis_ClientCertPassword");
var pfxStorageFlags = Environment.GetEnvironmentVariable("SERedis_ClientCertStorageFlags");
X509KeyStorageFlags? flags = null;
if (!string.IsNullOrEmpty(pfxStorageFlags))
{
flags = Enum.Parse(typeof(X509KeyStorageFlags), pfxStorageFlags) as X509KeyStorageFlags?;
}
if (!string.IsNullOrEmpty(pfxPath) && File.Exists(pfxPath))
{
return delegate { return new X509Certificate2(pfxPath, pfxPassword ?? "", flags ?? X509KeyStorageFlags.DefaultKeySet); };
}
}
catch
{ }
return null;
}
SocketMode ISocketCallback.Connected(Stream stream, TextWriter log)
{
try
{
var socketMode = SocketManager.DefaultSocketMode;
// disallow connection in some cases
OnDebugAbort();
// the order is important here:
// [network]<==[ssl]<==[logging]<==[buffered]
var config = Multiplexer.RawConfig;
if (config.Ssl)
{
Multiplexer.LogLocked(log, "Configuring SSL");
var host = config.SslHost;
if (string.IsNullOrWhiteSpace(host)) host = Format.ToStringHostOnly(Bridge.ServerEndPoint.EndPoint);
var ssl = new SslStream(stream, false, config.CertificateValidationCallback,
config.CertificateSelectionCallback ?? GetAmbientCertificateCallback()
#if !__MonoCS__
, EncryptionPolicy.RequireEncryption
#endif
);
try
{
ssl.AuthenticateAsClient(host, config.SslProtocols);
Multiplexer.LogLocked(log, $"SSL connection established successfully using protocol: {ssl.SslProtocol}");
}
catch (AuthenticationException authexception)
{
RecordConnectionFailed(ConnectionFailureType.AuthenticationFailure, authexception);
Multiplexer.Trace("Encryption failure");
return SocketMode.Abort;
}
stream = ssl;
socketMode = SocketMode.Async;
}
OnWrapForLogging(ref stream, physicalName);
int bufferSize = config.WriteBuffer;
netStream = stream;
outStream = bufferSize <= 0 ? stream : new BufferedStream(stream, bufferSize);
Multiplexer.LogLocked(log, "Connected {0}", Bridge);
Bridge.OnConnected(this, log);
return socketMode;
}
catch (Exception ex)
{
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex); // includes a bridge.OnDisconnected
Multiplexer.Trace("Could not connect: " + ex.Message, physicalName);
return SocketMode.Abort;
}
}
#if NETSTANDARD1_5
private bool EndReading(Task<int> result)
{
try
{
var tmp = netStream;
int bytesRead = tmp == null ? 0 : result.Result; // note we expect this to be completed
return ProcessReadBytes(bytesRead);
}
catch (Exception ex)
{
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
return false;
}
}
#else
private bool EndReading(IAsyncResult result)
{
try
{
int bytesRead = netStream?.EndRead(result) ?? 0;
return ProcessReadBytes(bytesRead);
}
catch (Exception ex)
{
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
return false;
}
}
#endif
private int EnsureSpaceAndComputeBytesToRead()
{
int space = ioBuffer.Length - ioBufferBytes;
if (space == 0)
{
Array.Resize(ref ioBuffer, ioBuffer.Length * 2);
space = ioBuffer.Length - ioBufferBytes;
}
return space;
}
void ISocketCallback.Error()
{
RecordConnectionFailed(ConnectionFailureType.SocketFailure);
}
private void MatchResult(RawResult result)
{
// check to see if it could be an out-of-band pubsub message
if (connectionType == ConnectionType.Subscription && result.Type == ResultType.MultiBulk)
{ // out of band message does not match to a queued message
var items = result.GetItems();
if (items.Length >= 3 && items[0].IsEqual(message))
{
// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry)
var configChanged = Multiplexer.ConfigurationChangedChannel;
if (configChanged != null && items[1].IsEqual(configChanged))
{
EndPoint blame = null;
try
{
if (!items[2].IsEqual(RedisLiterals.ByteWildcard))
{
blame = Format.TryParseEndPoint(items[2].GetString());
}
}
catch { /* no biggie */ }
Multiplexer.Trace("Configuration changed: " + Format.ToString(blame), physicalName);
Multiplexer.ReconfigureIfNeeded(blame, true, "broadcast");
}
// invoke the handlers
var channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
Multiplexer.Trace("MESSAGE: " + channel, physicalName);
if (!channel.IsNull)
{
Multiplexer.OnMessage(channel, channel, items[2].AsRedisValue());
}
return; // AND STOP PROCESSING!
}
else if (items.Length >= 4 && items[0].IsEqual(pmessage))
{
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
Multiplexer.Trace("PMESSAGE: " + channel, physicalName);
if (!channel.IsNull)
{
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
Multiplexer.OnMessage(sub, channel, items[3].AsRedisValue());
}
return; // AND STOP PROCESSING!
}
// if it didn't look like "[p]message", then we still need to process the pending queue
}
Multiplexer.Trace("Matching result...", physicalName);
Message msg;
lock (outstanding)
{
Multiplexer.Trace(outstanding.Count == 0, "Nothing to respond to!", physicalName);
msg = outstanding.Dequeue();
}
Multiplexer.Trace("Response to: " + msg, physicalName);
if (msg.ComputeResult(this, result))
{
Bridge.CompleteSyncOrAsync(msg);
}
}
partial void OnCloseEcho();
partial void OnCreateEcho();
partial void OnDebugAbort();
void ISocketCallback.OnHeartbeat()
{
try
{
Bridge.OnHeartbeat(true); // all the fun code is here
}
catch (Exception ex)
{
OnInternalError(ex);
}
}
partial void OnWrapForLogging(ref Stream stream, string name);
private int ProcessBuffer(byte[] underlying, ref int offset, ref int count)
{
int messageCount = 0;
RawResult result;
do
{
int tmpOffset = offset, tmpCount = count;
// we want TryParseResult to be able to mess with these without consequence
result = TryParseResult(underlying, ref tmpOffset, ref tmpCount);
if (result.HasValue)
{
messageCount++;
// entire message: update the external counters
offset = tmpOffset;
count = tmpCount;
Multiplexer.Trace(result.ToString(), physicalName);
MatchResult(result);
}
} while (result.HasValue);
return messageCount;
}
private bool ProcessReadBytes(int bytesRead)
{
if (bytesRead <= 0)
{
Multiplexer.Trace("EOF", physicalName);
RecordConnectionFailed(ConnectionFailureType.SocketClosed);
return false;
}
Interlocked.Exchange(ref lastReadTickCount, Environment.TickCount);
// reset unanswered write timestamp
VolatileWrapper.Write(ref firstUnansweredWriteTickCount, 0);
ioBufferBytes += bytesRead;
Multiplexer.Trace("More bytes available: " + bytesRead + " (" + ioBufferBytes + ")", physicalName);
int offset = 0, count = ioBufferBytes;
int handled = ProcessBuffer(ioBuffer, ref offset, ref count);
Multiplexer.Trace("Processed: " + handled, physicalName);
if (handled != 0)
{
// read stuff
if (count != 0)
{
Multiplexer.Trace("Copying remaining bytes: " + count, physicalName);
// if anything was left over, we need to copy it to
// the start of the buffer so it can be used next time
Buffer.BlockCopy(ioBuffer, offset, ioBuffer, 0, count);
}
ioBufferBytes = count;
}
return true;
}
void ISocketCallback.Read()
{
Interlocked.Increment(ref haveReader);
try
{
do
{
int space = EnsureSpaceAndComputeBytesToRead();
int bytesRead = netStream?.Read(ioBuffer, ioBufferBytes, space) ?? 0;
if (!ProcessReadBytes(bytesRead)) return; // EOF
} while (socketToken.Available != 0);
Multiplexer.Trace("Buffer exhausted", physicalName);
// ^^^ note that the socket manager will call us again when there is something to do
}
catch (Exception ex)
{
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
}
finally
{
Interlocked.Decrement(ref haveReader);
}
}
bool ISocketCallback.IsDataAvailable
{
get
{
try { return socketToken.Available > 0; }
catch { return false; }
}
}
private RawResult ReadArray(byte[] buffer, ref int offset, ref int count)
{
var itemCount = ReadLineTerminatedString(ResultType.Integer, buffer, ref offset, ref count);
if (itemCount.HasValue)
{
if (!itemCount.TryGetInt64(out long i64)) throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid array length", Bridge.ServerEndPoint);
int itemCountActual = checked((int)i64);
if (itemCountActual < 0)
{
//for null response by command like EXEC, RESP array: *-1\r\n
return new RawResult(ResultType.SimpleString, null, 0, 0);
}
else if (itemCountActual == 0)
{
//for zero array response by command like SCAN, Resp array: *0\r\n
return RawResult.EmptyArray;
}
var arr = new RawResult[itemCountActual];
for (int i = 0; i < itemCountActual; i++)
{
if (!(arr[i] = TryParseResult(buffer, ref offset, ref count)).HasValue)
return RawResult.Nil;
}
return new RawResult(arr);
}
return RawResult.Nil;
}
private RawResult ReadBulkString(byte[] buffer, ref int offset, ref int count)
{
var prefix = ReadLineTerminatedString(ResultType.Integer, buffer, ref offset, ref count);
if (prefix.HasValue)
{
if (!prefix.TryGetInt64(out long i64)) throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid bulk string length", Bridge.ServerEndPoint);
int bodySize = checked((int)i64);
if (bodySize < 0)
{
return new RawResult(ResultType.BulkString, null, 0, 0);
}
else if (count >= bodySize + 2)
{
if (buffer[offset + bodySize] != '\r' || buffer[offset + bodySize + 1] != '\n')
{
throw ExceptionFactory.ConnectionFailure(Multiplexer.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "Invalid bulk string terminator", Bridge.ServerEndPoint);
}
var result = new RawResult(ResultType.BulkString, buffer, offset, bodySize);
offset += bodySize + 2;
count -= bodySize + 2;
return result;
}
}
return RawResult.Nil;
}
private RawResult ReadLineTerminatedString(ResultType type, byte[] buffer, ref int offset, ref int count)
{
int max = offset + count - 2;
for (int i = offset; i < max; i++)
{
if (buffer[i + 1] == '\r' && buffer[i + 2] == '\n')
{
int len = i - offset + 1;
var result = new RawResult(type, buffer, offset, len);
count -= (len + 2);
offset += (len + 2);
return result;
}
}
return RawResult.Nil;
}
void ISocketCallback.StartReading()
{
BeginReading();
}
private RawResult TryParseResult(byte[] buffer, ref int offset, ref int count)
{
if (count == 0) return RawResult.Nil;
char resultType = (char)buffer[offset++];
count--;
switch (resultType)
{
case '+': // simple string
return ReadLineTerminatedString(ResultType.SimpleString, buffer, ref offset, ref count);
case '-': // error
return ReadLineTerminatedString(ResultType.Error, buffer, ref offset, ref count);
case ':': // integer
return ReadLineTerminatedString(ResultType.Integer, buffer, ref offset, ref count);
case '$': // bulk string
return ReadBulkString(buffer, ref offset, ref count);
case '*': // array
return ReadArray(buffer, ref offset, ref count);
default:
throw new InvalidOperationException("Unexpected response prefix: " + (char)resultType);
}
}
partial void DebugEmulateStaleConnection(ref int firstUnansweredWrite);
public void CheckForStaleConnection(ref SocketManager.ManagerState state)
{
int firstUnansweredWrite = VolatileWrapper.Read(ref firstUnansweredWriteTickCount);
DebugEmulateStaleConnection(ref firstUnansweredWrite);
int now = Environment.TickCount;
if (firstUnansweredWrite != 0 && (now - firstUnansweredWrite) > Multiplexer.RawConfig.ResponseTimeout)
{
RecordConnectionFailed(ConnectionFailureType.SocketFailure, ref state, origin: "CheckForStaleConnection");
}
}
}
}
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -65,14 +65,14 @@ internal abstract class ResultProcessor
RedisValue = new RedisValueProcessor();
public static readonly ResultProcessor<RedisValue[]>
RedisValueArray = new RedisValueArrayProcessor();
public static readonly ResultProcessor<string[]>
RedisValueArray = new RedisValueArrayProcessor();
public static readonly ResultProcessor<string[]>
StringArray = new StringArrayProcessor();
public static readonly ResultProcessor<GeoPosition?[]>
RedisGeoPositionArray = new RedisValueGeoPositionArrayProcessor();
public static readonly ResultProcessor<GeoPosition?>
RedisGeoPositionArray = new RedisValueGeoPositionArrayProcessor();
public static readonly ResultProcessor<GeoPosition?>
RedisGeoPosition = new RedisValueGeoPositionProcessor();
public static readonly ResultProcessor<TimeSpan>
......@@ -96,10 +96,10 @@ public static readonly SortedSetEntryArrayProcessor
SentinelMasterEndpoint = new SentinelGetMasterAddressByNameProcessor();
public static readonly ResultProcessor<KeyValuePair<string, string>[][]>
SentinelArrayOfArrays = new SentinelArrayOfArraysProcessor();
SentinelArrayOfArrays = new SentinelArrayOfArraysProcessor();
#endregion
public static readonly ResultProcessor<KeyValuePair<string, string>[]>
StringPairInterleaved = new StringPairInterleavedProcessor();
public static readonly TimeSpanProcessor
......@@ -107,8 +107,8 @@ public static readonly TimeSpanProcessor
TimeSpanFromSeconds = new TimeSpanProcessor(false);
public static readonly HashEntryArrayProcessor
HashEntryArray = new HashEntryArrayProcessor();
private static readonly byte[] MOVED = Encoding.UTF8.GetBytes("MOVED "), ASK = Encoding.UTF8.GetBytes("ASK ");
private static readonly byte[] MOVED = Encoding.UTF8.GetBytes("MOVED "), ASK = Encoding.UTF8.GetBytes("ASK ");
public void ConnectionFail(Message message, ConnectionFailureType fail, Exception innerException)
{
PhysicalConnection.IdentifyFailureType(innerException, ref fail);
......@@ -150,10 +150,10 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, Ra
var bridge = connection.Bridge;
var server = bridge.ServerEndPoint;
bool log = !message.IsInternalCall;
bool isMoved = result.AssertStarts(MOVED);
bool isMoved = result.AssertStarts(MOVED);
string err = string.Empty;
if (isMoved || result.AssertStarts(ASK))
{
{
message.SetResponseReceived();
log = false;
......@@ -161,38 +161,38 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, Ra
EndPoint endpoint;
if (Format.TryParseInt32(parts[1], out int hashSlot)
&& (endpoint = Format.TryParseEndPoint(parts[2])) != null)
{
// no point sending back to same server, and no point sending to a dead server
{
// no point sending back to same server, and no point sending to a dead server
if (!Equals(server.EndPoint, endpoint))
{
if (bridge.Multiplexer.TryResend(hashSlot, message, endpoint, isMoved))
{
connection.Multiplexer.Trace(message.Command + " re-issued to " + endpoint, isMoved ? "MOVED" : "ASK");
return false;
}
else
{
if (isMoved && (message.Flags & CommandFlags.NoRedirect) != 0)
{
err = $"Key has MOVED from Endpoint {endpoint} and hashslot {hashSlot} but CommandFlags.NoRedirect was specified - redirect not followed. ";
}
else
{
err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. ";
}
#if FEATURE_PERFCOUNTER
err += ConnectionMultiplexer.GetThreadPoolAndCPUSummary(bridge.Multiplexer.IncludePerformanceCountersInExceptions);
#endif
{
if (bridge.Multiplexer.TryResend(hashSlot, message, endpoint, isMoved))
{
connection.Multiplexer.Trace(message.Command + " re-issued to " + endpoint, isMoved ? "MOVED" : "ASK");
return false;
}
else
{
if (isMoved && (message.Flags & CommandFlags.NoRedirect) != 0)
{
err = $"Key has MOVED from Endpoint {endpoint} and hashslot {hashSlot} but CommandFlags.NoRedirect was specified - redirect not followed. ";
}
else
{
err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. ";
}
#if FEATURE_PERFCOUNTER
err += ConnectionMultiplexer.GetThreadPoolAndCPUSummary(bridge.Multiplexer.IncludePerformanceCountersInExceptions);
#endif
}
}
}
}
if (string.IsNullOrWhiteSpace(err))
{
err = result.GetString();
}
}
if (string.IsNullOrWhiteSpace(err))
{
err = result.GetString();
}
if (log)
{
bridge.Multiplexer.OnErrorMessage(server.EndPoint, err);
......@@ -214,7 +214,7 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, Ra
}
return true;
}
protected abstract bool SetResultCore(PhysicalConnection connection, Message message, RawResult result);
private void UnexpectedResponse(Message message, RawResult result)
......@@ -230,7 +230,7 @@ public TimeSpanProcessor(bool isMilliseconds)
{
this.isMilliseconds = isMilliseconds;
}
public bool TryParse(RawResult result, out TimeSpan? expiry)
{
switch (result.Type)
......@@ -258,7 +258,7 @@ public bool TryParse(RawResult result, out TimeSpan? expiry)
expiry = null;
return false;
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
if (TryParse(result, out TimeSpan? expiry))
......@@ -276,7 +276,7 @@ public static TimerMessage CreateMessage(int db, CommandFlags flags, RedisComman
{
return new TimerMessage(db, flags, command, value);
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
if (result.Type == ResultType.Error)
......@@ -309,10 +309,10 @@ internal sealed class TimerMessage : Message
public TimerMessage(int db, CommandFlags flags, RedisCommand command, RedisValue value)
: base(db, flags, command)
{
this.Watch = Stopwatch.StartNew();
Watch = Stopwatch.StartNew();
this.value = value;
}
internal override void WriteImpl(PhysicalConnection physical)
{
if (value.IsNull)
......@@ -344,7 +344,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
return false;
}
}
internal sealed class DemandZeroOrOneProcessor : ResultProcessor<bool>
{
private static readonly byte[] zero = { (byte)'0' }, one = { (byte)'1' };
......@@ -363,7 +363,7 @@ public static bool TryGet(RawResult result, out bool value)
value = false;
return false;
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
if (TryGet(result, out bool value))
......@@ -383,7 +383,7 @@ internal static bool IsSHA1(string script)
{
return script != null && sha1.IsMatch(script);
}
internal static byte[] ParseSHA1(byte[] value)
{
if (value?.Length == 40)
......@@ -400,7 +400,7 @@ internal static byte[] ParseSHA1(byte[] value)
}
return null;
}
internal static byte[] ParseSHA1(string value)
{
if (value?.Length == 40 && sha1.IsMatch(value))
......@@ -417,7 +417,7 @@ internal static byte[] ParseSHA1(string value)
}
return null;
}
private static int FromHex(char c)
{
if (c >= '0' && c <= '9') return c - '0';
......@@ -505,7 +505,7 @@ public bool TryParse(RawResult result, out T[] pairs)
return false;
}
}
protected abstract T Parse(RawResult first, RawResult second);
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
......@@ -531,7 +531,7 @@ public override bool SetResult(PhysicalConnection connection, Message message, R
}
return base.SetResult(connection, message, result);
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
var server = connection.Bridge.ServerEndPoint;
......@@ -684,7 +684,7 @@ private static string Extract(string line, string prefix)
return null;
}
}
private sealed class BooleanProcessor : ResultProcessor<bool>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
......@@ -861,7 +861,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
return false;
}
}
private sealed class ExpectBasicStringProcessor : ResultProcessor<bool>
{
private readonly byte[] expected;
......@@ -869,12 +869,12 @@ public ExpectBasicStringProcessor(string value)
{
expected = Encoding.UTF8.GetBytes(value);
}
public ExpectBasicStringProcessor(byte[] value)
{
expected = value;
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
if (result.IsEqual(expected))
......@@ -945,7 +945,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
return false;
}
}
private class PubSubNumSubProcessor : Int64Processor
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
......@@ -988,7 +988,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
return false;
}
}
private sealed class NullableInt64Processor : ResultProcessor<long?>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
......@@ -1022,7 +1022,7 @@ public RedisChannelArrayProcessor(RedisChannel.PatternMode mode)
{
this.mode = mode;
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
switch (result.Type)
......@@ -1114,8 +1114,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
}
return false;
}
}
}
private sealed class StringArrayProcessor : ResultProcessor<string[]>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
......@@ -1130,8 +1130,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
}
return false;
}
}
}
private sealed class RedisValueGeoPositionProcessor : ResultProcessor<GeoPosition?>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
......@@ -1146,8 +1146,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
}
return false;
}
}
}
private sealed class RedisValueGeoPositionArrayProcessor : ResultProcessor<GeoPosition?[]>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
......@@ -1166,27 +1166,27 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
private sealed class GeoRadiusResultArrayProcessor : ResultProcessor<GeoRadiusResult[]>
{
private static readonly GeoRadiusResultArrayProcessor[] instances;
private readonly GeoRadiusOptions options;
static GeoRadiusResultArrayProcessor()
{
instances = new GeoRadiusResultArrayProcessor[8];
for (int i = 0; i < 8; i++) instances[i] = new GeoRadiusResultArrayProcessor((GeoRadiusOptions)i);
}
public static GeoRadiusResultArrayProcessor Get(GeoRadiusOptions options)
{
int i = (int)options;
if (i < 0 || i >= instances.Length) throw new ArgumentOutOfRangeException(nameof(options));
return instances[i];
}
private GeoRadiusResultArrayProcessor(GeoRadiusOptions options)
{
this.options = options;
}
private static readonly GeoRadiusResultArrayProcessor[] instances;
private readonly GeoRadiusOptions options;
static GeoRadiusResultArrayProcessor()
{
instances = new GeoRadiusResultArrayProcessor[8];
for (int i = 0; i < 8; i++) instances[i] = new GeoRadiusResultArrayProcessor((GeoRadiusOptions)i);
}
public static GeoRadiusResultArrayProcessor Get(GeoRadiusOptions options)
{
int i = (int)options;
if (i < 0 || i >= instances.Length) throw new ArgumentOutOfRangeException(nameof(options));
return instances[i];
}
private GeoRadiusResultArrayProcessor(GeoRadiusOptions options)
{
this.options = options;
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
switch (result.Type)
......@@ -1195,281 +1195,281 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
var arr = result.GetItemsAsRawResults();
GeoRadiusResult[] typed;
if (arr == null)
{
typed = null;
if (arr == null)
{
typed = null;
}
else
{
var options = this.options;
typed = new GeoRadiusResult[arr.Length];
for (int i = 0; i < arr.Length; i++)
{
typed[i] = Parse(options, arr[i]);
}
else
{
var options = this.options;
typed = new GeoRadiusResult[arr.Length];
for (int i = 0; i < arr.Length; i++)
{
typed[i] = Parse(options, arr[i]);
}
}
SetResult(message, typed);
return true;
}
return false;
}
private static GeoRadiusResult Parse(GeoRadiusOptions options, RawResult item)
{
if (options == GeoRadiusOptions.None)
{
// Without any WITH option specified, the command just returns a linear array like ["New York","Milan","Paris"].
return new GeoRadiusResult(item.AsRedisValue(), null, null, null);
}
// If WITHCOORD, WITHDIST or WITHHASH options are specified, the command returns an array of arrays, where each sub-array represents a single item.
var arr = item.GetArrayOfRawResults();
int index = 0;
// the first item in the sub-array is always the name of the returned item.
var member = arr[index++].AsRedisValue();
/* The other information is returned in the following order as successive elements of the sub-array.
The distance from the center as a floating point number, in the same unit specified in the radius.
The geohash integer.
The coordinates as a two items x,y array (longitude,latitude).
*/
double? distance = null;
GeoPosition? position = null;
long? hash = null;
if ((options & GeoRadiusOptions.WithDistance) != 0) { distance = (double?)arr[index++].AsRedisValue(); }
if ((options & GeoRadiusOptions.WithGeoHash) != 0) { hash = (long?)arr[index++].AsRedisValue(); }
if ((options & GeoRadiusOptions.WithCoordinates) != 0)
{
var coords = arr[index++].GetArrayOfRawResults();
double longitude = (double)coords[0].AsRedisValue(), latitude = (double)coords[1].AsRedisValue();
position = new GeoPosition(longitude, latitude);
}
return new GeoRadiusResult(member, distance, hash, position);
}
}
private sealed class RedisValueProcessor : ResultProcessor<RedisValue>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
switch (result.Type)
{
case ResultType.Integer:
case ResultType.SimpleString:
case ResultType.BulkString:
SetResult(message, result.AsRedisValue());
return true;
}
return false;
}
}
private class ScriptResultProcessor : ResultProcessor<RedisResult>
{
private static readonly byte[] NOSCRIPT = Encoding.UTF8.GetBytes("NOSCRIPT ");
public override bool SetResult(PhysicalConnection connection, Message message, RawResult result)
{
if (result.Type == ResultType.Error && result.AssertStarts(NOSCRIPT))
{ // scripts are not flushed individually, so assume the entire script cache is toast ("SCRIPT FLUSH")
connection.Bridge.ServerEndPoint.FlushScriptCache();
message.SetScriptUnavailable();
}
// and apply usual processing for the rest
return base.SetResult(connection, message, result);
}
// note that top-level error messages still get handled by SetResult, but nested errors
// (is that a thing?) will be wrapped in the RedisResult
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
var value = Redis.RedisResult.TryCreate(connection, result);
if (value != null)
{
SetResult(message, value);
return true;
}
return false;
}
}
private sealed class StringPairInterleavedProcessor : ValuePairInterleavedProcessorBase<KeyValuePair<string, string>>
{
protected override KeyValuePair<string, string> Parse(RawResult first, RawResult second)
{
return new KeyValuePair<string, string>(first.GetString(), second.GetString());
}
}
private sealed class StringProcessor : ResultProcessor<string>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
switch (result.Type)
{
case ResultType.Integer:
case ResultType.SimpleString:
case ResultType.BulkString:
SetResult(message, result.GetString());
return true;
case ResultType.MultiBulk:
var arr = result.GetItems();
if(arr.Length == 1)
{
SetResult(message, arr[0].GetString());
return true;
}
break;
}
return false;
}
}
private class TracerProcessor : ResultProcessor<bool>
{
private static readonly byte[]
authRequired = Encoding.UTF8.GetBytes("NOAUTH Authentication required."),
authFail = Encoding.UTF8.GetBytes("ERR operation not permitted"),
loading = Encoding.UTF8.GetBytes("LOADING ");
private readonly bool establishConnection;
public TracerProcessor(bool establishConnection)
{
this.establishConnection = establishConnection;
}
public override bool SetResult(PhysicalConnection connection, Message message, RawResult result)
{
var final = base.SetResult(connection, message, result);
if (result.IsError)
{
if (result.IsEqual(authFail) || result.IsEqual(authRequired))
{
connection.RecordConnectionFailed(ConnectionFailureType.AuthenticationFailure, new Exception(result.ToString() + " Verify if the Redis password provided is correct."));
}
else if (result.AssertStarts(loading))
{
connection.RecordConnectionFailed(ConnectionFailureType.Loading);
}
else
{
connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure);
}
}
return final;
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
bool happy;
switch (message.Command)
{
case RedisCommand.ECHO:
happy = result.Type == ResultType.BulkString && (!establishConnection || result.IsEqual(connection.Multiplexer.UniqueId));
break;
case RedisCommand.PING:
happy = result.Type == ResultType.SimpleString && result.IsEqual(RedisLiterals.BytesPONG);
break;
case RedisCommand.TIME:
happy = result.Type == ResultType.MultiBulk && result.GetItems().Length == 2;
break;
case RedisCommand.EXISTS:
happy = result.Type == ResultType.Integer;
break;
default:
happy = true;
break;
}
if (happy)
{
if (establishConnection) connection.Bridge.OnFullyEstablished(connection);
SetResult(message, happy);
return true;
}
else
{
connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure);
return false;
}
}
}
}
private static GeoRadiusResult Parse(GeoRadiusOptions options, RawResult item)
{
if (options == GeoRadiusOptions.None)
{
// Without any WITH option specified, the command just returns a linear array like ["New York","Milan","Paris"].
return new GeoRadiusResult(item.AsRedisValue(), null, null, null);
}
// If WITHCOORD, WITHDIST or WITHHASH options are specified, the command returns an array of arrays, where each sub-array represents a single item.
var arr = item.GetArrayOfRawResults();
int index = 0;
// the first item in the sub-array is always the name of the returned item.
var member = arr[index++].AsRedisValue();
/* The other information is returned in the following order as successive elements of the sub-array.
The distance from the center as a floating point number, in the same unit specified in the radius.
The geohash integer.
The coordinates as a two items x,y array (longitude,latitude).
*/
double? distance = null;
GeoPosition? position = null;
long? hash = null;
if ((options & GeoRadiusOptions.WithDistance) != 0) { distance = (double?)arr[index++].AsRedisValue(); }
if ((options & GeoRadiusOptions.WithGeoHash) != 0) { hash = (long?)arr[index++].AsRedisValue(); }
if ((options & GeoRadiusOptions.WithCoordinates) != 0)
{
var coords = arr[index++].GetArrayOfRawResults();
double longitude = (double)coords[0].AsRedisValue(), latitude = (double)coords[1].AsRedisValue();
position = new GeoPosition(longitude, latitude);
}
return new GeoRadiusResult(member, distance, hash, position);
}
}
private sealed class RedisValueProcessor : ResultProcessor<RedisValue>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
switch (result.Type)
{
case ResultType.Integer:
case ResultType.SimpleString:
case ResultType.BulkString:
SetResult(message, result.AsRedisValue());
return true;
}
return false;
}
}
private class ScriptResultProcessor : ResultProcessor<RedisResult>
{
private static readonly byte[] NOSCRIPT = Encoding.UTF8.GetBytes("NOSCRIPT ");
public override bool SetResult(PhysicalConnection connection, Message message, RawResult result)
{
if (result.Type == ResultType.Error && result.AssertStarts(NOSCRIPT))
{ // scripts are not flushed individually, so assume the entire script cache is toast ("SCRIPT FLUSH")
connection.Bridge.ServerEndPoint.FlushScriptCache();
message.SetScriptUnavailable();
}
// and apply usual processing for the rest
return base.SetResult(connection, message, result);
}
// note that top-level error messages still get handled by SetResult, but nested errors
// (is that a thing?) will be wrapped in the RedisResult
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
var value = Redis.RedisResult.TryCreate(connection, result);
if (value != null)
{
SetResult(message, value);
return true;
}
return false;
}
}
private sealed class StringPairInterleavedProcessor : ValuePairInterleavedProcessorBase<KeyValuePair<string, string>>
{
protected override KeyValuePair<string, string> Parse(RawResult first, RawResult second)
{
return new KeyValuePair<string, string>(first.GetString(), second.GetString());
}
}
private sealed class StringProcessor : ResultProcessor<string>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
switch (result.Type)
{
case ResultType.Integer:
case ResultType.SimpleString:
case ResultType.BulkString:
SetResult(message, result.GetString());
return true;
case ResultType.MultiBulk:
var arr = result.GetItems();
if (arr.Length == 1)
{
SetResult(message, arr[0].GetString());
return true;
}
break;
}
return false;
}
}
private class TracerProcessor : ResultProcessor<bool>
{
private static readonly byte[]
authRequired = Encoding.UTF8.GetBytes("NOAUTH Authentication required."),
authFail = Encoding.UTF8.GetBytes("ERR operation not permitted"),
loading = Encoding.UTF8.GetBytes("LOADING ");
private readonly bool establishConnection;
public TracerProcessor(bool establishConnection)
{
this.establishConnection = establishConnection;
}
public override bool SetResult(PhysicalConnection connection, Message message, RawResult result)
{
var final = base.SetResult(connection, message, result);
if (result.IsError)
{
if (result.IsEqual(authFail) || result.IsEqual(authRequired))
{
connection.RecordConnectionFailed(ConnectionFailureType.AuthenticationFailure, new Exception(result.ToString() + " Verify if the Redis password provided is correct."));
}
else if (result.AssertStarts(loading))
{
connection.RecordConnectionFailed(ConnectionFailureType.Loading);
}
else
{
connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure);
}
}
return final;
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
bool happy;
switch (message.Command)
{
case RedisCommand.ECHO:
happy = result.Type == ResultType.BulkString && (!establishConnection || result.IsEqual(connection.Multiplexer.UniqueId));
break;
case RedisCommand.PING:
happy = result.Type == ResultType.SimpleString && result.IsEqual(RedisLiterals.BytesPONG);
break;
case RedisCommand.TIME:
happy = result.Type == ResultType.MultiBulk && result.GetItems().Length == 2;
break;
case RedisCommand.EXISTS:
happy = result.Type == ResultType.Integer;
break;
default:
happy = true;
break;
}
if (happy)
{
if (establishConnection) connection.Bridge.OnFullyEstablished(connection);
SetResult(message, happy);
return true;
}
else
{
connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure);
return false;
}
}
}
#region Sentinel
private sealed class SentinelGetMasterAddressByNameProcessor : ResultProcessor<EndPoint>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
switch (result.Type)
{
case ResultType.MultiBulk:
var arr = result.GetItemsAsValues();
int port;
if (arr.Length == 2 && int.TryParse(arr[1], out port))
{
SetResult(message, Format.ParseEndPoint(arr[0], port));
return true;
}
else if (arr.Length == 0)
{
SetResult(message, null);
return true;
}
break;
case ResultType.SimpleString:
//We don't want to blow up if the master is not found
if (result.IsNull)
return true;
break;
}
return false;
}
}
private sealed class SentinelArrayOfArraysProcessor : ResultProcessor<KeyValuePair<string, string>[][]>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
var innerProcessor = StringPairInterleaved as StringPairInterleavedProcessor;
if (innerProcessor == null)
{
return false;
}
switch (result.Type)
{
case ResultType.MultiBulk:
var arrayOfArrays = result.GetArrayOfRawResults();
var returnArray = new KeyValuePair<string, string>[arrayOfArrays.Length][];
for (int i = 0; i < arrayOfArrays.Length; i++)
{
var rawInnerArray = arrayOfArrays[i];
innerProcessor.TryParse(rawInnerArray, out KeyValuePair<string, string>[] kvpArray);
returnArray[i] = kvpArray;
}
SetResult(message, returnArray);
return true;
}
return false;
}
}
private sealed class SentinelGetMasterAddressByNameProcessor : ResultProcessor<EndPoint>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
switch (result.Type)
{
case ResultType.MultiBulk:
var arr = result.GetItemsAsValues();
int port;
if (arr.Length == 2 && int.TryParse(arr[1], out port))
{
SetResult(message, Format.ParseEndPoint(arr[0], port));
return true;
}
else if (arr.Length == 0)
{
SetResult(message, null);
return true;
}
break;
case ResultType.SimpleString:
//We don't want to blow up if the master is not found
if (result.IsNull)
return true;
break;
}
return false;
}
}
private sealed class SentinelArrayOfArraysProcessor : ResultProcessor<KeyValuePair<string, string>[][]>
{
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{
var innerProcessor = StringPairInterleaved as StringPairInterleavedProcessor;
if (innerProcessor == null)
{
return false;
}
switch (result.Type)
{
case ResultType.MultiBulk:
var arrayOfArrays = result.GetArrayOfRawResults();
var returnArray = new KeyValuePair<string, string>[arrayOfArrays.Length][];
for (int i = 0; i < arrayOfArrays.Length; i++)
{
var rawInnerArray = arrayOfArrays[i];
innerProcessor.TryParse(rawInnerArray, out KeyValuePair<string, string>[] kvpArray);
returnArray[i] = kvpArray;
}
SetResult(message, returnArray);
return true;
}
return false;
}
}
#endregion
}
internal abstract class ResultProcessor<T> : ResultProcessor
{
protected void SetResult(Message message, T value)
{
if (message == null) return;
var box = message.ResultBox as ResultBox<T>;
message.SetResponseReceived();
box?.SetResult(value);
}
}
internal abstract class ResultProcessor<T> : ResultProcessor
{
protected void SetResult(Message message, T value)
{
if (message == null) return;
var box = message.ResultBox as ResultBox<T>;
message.SetResponseReceived();
box?.SetResult(value);
}
}
}
......@@ -199,7 +199,7 @@ public void SetClusterConfiguration(ClusterConfiguration configuration)
multiplexer.Trace("Updating cluster ranges...");
multiplexer.UpdateClusterRange(configuration);
multiplexer.Trace("Resolving genealogy...");
var thisNode = configuration.Nodes.FirstOrDefault(x => x.EndPoint.Equals(this.EndPoint));
var thisNode = configuration.Nodes.FirstOrDefault(x => x.EndPoint.Equals(EndPoint));
if (thisNode != null)
{
List<ServerEndPoint> slaves = null;
......@@ -212,8 +212,7 @@ public void SetClusterConfiguration(ClusterConfiguration configuration)
}
else if (node.ParentNodeId == thisNode.NodeId)
{
if (slaves == null) slaves = new List<ServerEndPoint>();
slaves.Add(multiplexer.GetServerEndPoint(node.EndPoint));
(slaves ?? (slaves = new List<ServerEndPoint>())).Add(multiplexer.GetServerEndPoint(node.EndPoint));
}
}
Master = master;
......
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
#if NETSTANDARD1_5
using System.Runtime.InteropServices;
using System.Threading.Tasks;
#endif
namespace StackExchange.Redis
{
internal enum SocketMode
{
Abort,
Poll,
Async
}
/// <summary>
/// Allows callbacks from SocketManager as work is discovered
/// </summary>
internal partial interface ISocketCallback
{
/// <summary>
/// Indicates that a socket has connected
/// </summary>
/// <param name="stream">The network stream for this socket.</param>
/// <param name="log">A text logger to write to.</param>
SocketMode Connected(Stream stream, TextWriter log);
/// <summary>
/// Indicates that the socket has signalled an error condition
/// </summary>
void Error();
void OnHeartbeat();
/// <summary>
/// Indicates that data is available on the socket, and that the consumer should read synchronously from the socket while there is data
/// </summary>
void Read();
/// <summary>
/// Indicates that we cannot know whether data is available, and that the consume should commence reading asynchronously
/// </summary>
void StartReading();
// check for write-read timeout
void CheckForStaleConnection(ref SocketManager.ManagerState state);
bool IsDataAvailable { get; }
}
internal struct SocketToken
{
internal readonly Socket Socket;
public SocketToken(Socket socket)
{
Socket = socket;
}
public int Available => Socket?.Available ?? 0;
public bool HasValue => Socket != null;
}
/// <summary>
/// A SocketManager monitors multiple sockets for availability of data; this is done using
/// the Socket.Select API and a dedicated reader-thread, which allows for fast responses
/// even when the system is under ambient load.
/// </summary>
public sealed partial class SocketManager : IDisposable
{
internal enum ManagerState
{
Inactive,
Preparing,
Faulted,
CheckForHeartbeat,
ExecuteHeartbeat,
LocateActiveSockets,
NoSocketsPause,
PrepareActiveSockets,
CullDeadSockets,
NoActiveSocketsPause,
GrowingSocketArray,
CopyingPointersForSelect,
ExecuteSelect,
ExecuteSelectComplete,
CheckForStaleConnections,
RecordConnectionFailed_OnInternalError,
RecordConnectionFailed_OnDisconnected,
RecordConnectionFailed_ReportFailure,
RecordConnectionFailed_OnConnectionFailed,
RecordConnectionFailed_FailOutstanding,
RecordConnectionFailed_ShutdownSocket,
CheckForStaleConnectionsDone,
EnqueueRead,
EnqueueError,
EnqueueReadFallback,
RequestAssistance,
ProcessQueues,
ProcessReadQueue,
ProcessErrorQueue,
}
private static readonly ParameterizedThreadStart writeAllQueues = context =>
{
try { ((SocketManager)context).WriteAllQueues(); } catch { }
};
private static readonly WaitCallback writeOneQueue = context =>
{
try { ((SocketManager)context).WriteOneQueue(); } catch { }
};
private readonly Queue<PhysicalBridge> writeQueue = new Queue<PhysicalBridge>();
private bool isDisposed;
private readonly bool useHighPrioritySocketThreads = true;
/// <summary>
/// Gets the name of this SocketManager instance
/// </summary>
public string Name { get; }
/// <summary>
/// Creates a new (optionally named) <see cref="SocketManager"/> instance
/// </summary>
/// <param name="name">The name for this <see cref="SocketManager"/>.</param>
public SocketManager(string name = null) : this(name, true) { }
/// <summary>
/// Creates a new <see cref="SocketManager"/> instance
/// </summary>
/// <param name="name">The name for this <see cref="SocketManager"/>.</param>
/// <param name="useHighPrioritySocketThreads">Whether this <see cref="SocketManager"/> should use high priority sockets.</param>
public SocketManager(string name, bool useHighPrioritySocketThreads)
{
if (string.IsNullOrWhiteSpace(name)) name = GetType().Name;
Name = name;
this.useHighPrioritySocketThreads = useHighPrioritySocketThreads;
// we need a dedicated writer, because when under heavy ambient load
// (a busy asp.net site, for example), workers are not reliable enough
#if NETSTANDARD1_5
var dedicatedWriter = new Thread(writeAllQueues);
#else
var dedicatedWriter = new Thread(writeAllQueues, 32 * 1024); // don't need a huge stack;
dedicatedWriter.Priority = useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal;
#endif
dedicatedWriter.Name = name + ":Write";
dedicatedWriter.IsBackground = true; // should not keep process alive
dedicatedWriter.Start(this); // will self-exit when disposed
}
private enum CallbackOperation
{
Read,
Error
}
/// <summary>
/// Releases all resources associated with this instance
/// </summary>
public void Dispose()
{
lock (writeQueue)
{
// make sure writer threads know to exit
isDisposed = true;
Monitor.PulseAll(writeQueue);
}
OnDispose();
}
internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, ConnectionMultiplexer multiplexer, TextWriter log)
{
var addressFamily = endpoint.AddressFamily == AddressFamily.Unspecified ? AddressFamily.InterNetwork : endpoint.AddressFamily;
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp);
SetFastLoopbackOption(socket);
socket.NoDelay = true;
try
{
CompletionType connectCompletionType = CompletionType.Any;
this.ShouldForceConnectCompletionType(ref connectCompletionType);
var formattedEndpoint = Format.ToString(endpoint);
var tuple = Tuple.Create(socket, callback);
if (endpoint is DnsEndPoint dnsEndpoint)
{
// A work-around for a Mono bug in BeginConnect(EndPoint endpoint, AsyncCallback callback, object state)
#if !FEATURE_THREADPOOL
multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
socket.ConnectAsync(dnsEndpoint.Host, dnsEndpoint.Port).ContinueWith(t =>
{
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(t, multiplexer, log, tuple);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
});
#else
CompletionTypeHelper.RunWithCompletionType(
cb => {
multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
return socket.BeginConnect(dnsEndpoint.Host, dnsEndpoint.Port, cb, tuple);
},
ar => {
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(ar, multiplexer, log, tuple);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
},
connectCompletionType);
#endif
}
else
{
#if !FEATURE_THREADPOOL
multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
socket.ConnectAsync(endpoint).ContinueWith(t =>
{
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(t, multiplexer, log, tuple);
});
#else
CompletionTypeHelper.RunWithCompletionType(
cb => {
multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
return socket.BeginConnect(endpoint, cb, tuple);
},
ar => {
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(ar, multiplexer, log, tuple);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
},
connectCompletionType);
#endif
}
}
catch (NotImplementedException ex)
{
if (!(endpoint is IPEndPoint))
{
throw new InvalidOperationException("BeginConnect failed with NotImplementedException; consider using IP endpoints, or enable ResolveDns in the configuration", ex);
}
throw;
}
var token = new SocketToken(socket);
return token;
}
internal void SetFastLoopbackOption(Socket socket)
{
// SIO_LOOPBACK_FAST_PATH (https://msdn.microsoft.com/en-us/library/windows/desktop/jj841212%28v=vs.85%29.aspx)
// Speeds up localhost operations significantly. OK to apply to a socket that will not be hooked up to localhost,
// or will be subject to WFP filtering.
const int SIO_LOOPBACK_FAST_PATH = -1744830448;
#if NETSTANDARD1_5
try
{
// Ioctl is not supported on other platforms at the moment
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
byte[] optionInValue = BitConverter.GetBytes(1);
socket.IOControl(SIO_LOOPBACK_FAST_PATH, optionInValue, null);
}
}
catch (SocketException) { }
catch (PlatformNotSupportedException)
{
// Fix for https://github.com/StackExchange/StackExchange.Redis/issues/582
// Checking the platform can fail on some platforms. However, we don't
// care if the platform check fails because this is for a Windows
// optimization, and checking the platform will not fail on Windows.
}
#else
// windows only
if (Environment.OSVersion.Platform == PlatformID.Win32NT)
{
// Win8/Server2012+ only
var osVersion = Environment.OSVersion.Version;
if (osVersion.Major > 6 || (osVersion.Major == 6 && osVersion.Minor >= 2))
{
byte[] optionInValue = BitConverter.GetBytes(1);
socket.IOControl(SIO_LOOPBACK_FAST_PATH, optionInValue, null);
}
}
#endif
}
internal void RequestWrite(PhysicalBridge bridge, bool forced)
{
if (Interlocked.CompareExchange(ref bridge.inWriteQueue, 1, 0) == 0 || forced)
{
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
if (writeQueue.Count == 1)
{
Monitor.PulseAll(writeQueue);
}
else if (writeQueue.Count >= 2)
{ // struggling are we? let's have some help dealing with the backlog
ThreadPool.QueueUserWorkItem(writeOneQueue, this);
}
}
}
}
internal void Shutdown(SocketToken token)
{
Shutdown(token.Socket);
}
private void EndConnectImpl(IAsyncResult ar, ConnectionMultiplexer multiplexer, TextWriter log, Tuple<Socket, ISocketCallback> tuple)
{
try
{
bool ignoreConnect = false;
ShouldIgnoreConnect(tuple.Item2, ref ignoreConnect);
if (ignoreConnect) return;
var socket = tuple.Item1;
var callback = tuple.Item2;
#if NETSTANDARD1_5
multiplexer.Wait((Task)ar); // make it explode if invalid (note: already complete at this point)
#else
socket.EndConnect(ar);
#endif
var netStream = new NetworkStream(socket, false);
var socketMode = callback?.Connected(netStream, log) ?? SocketMode.Abort;
switch (socketMode)
{
case SocketMode.Poll:
multiplexer.LogLocked(log, "Starting poll");
OnAddRead(socket, callback);
break;
case SocketMode.Async:
multiplexer.LogLocked(log, "Starting read");
try
{ callback.StartReading(); }
catch (Exception ex)
{
ConnectionMultiplexer.TraceWithoutContext(ex.Message);
Shutdown(socket);
}
break;
default:
ConnectionMultiplexer.TraceWithoutContext("Aborting socket");
Shutdown(socket);
break;
}
}
catch (ObjectDisposedException)
{
multiplexer.LogLocked(log, "(socket shutdown)");
if (tuple != null)
{
try
{ tuple.Item2.Error(); }
catch (Exception inner)
{
ConnectionMultiplexer.TraceWithoutContext(inner.Message);
}
}
}
catch(Exception outer)
{
ConnectionMultiplexer.TraceWithoutContext(outer.Message);
if (tuple != null)
{
try
{ tuple.Item2.Error(); }
catch (Exception inner)
{
ConnectionMultiplexer.TraceWithoutContext(inner.Message);
}
}
}
}
partial void OnDispose();
partial void OnShutdown(Socket socket);
partial void ShouldIgnoreConnect(ISocketCallback callback, ref bool ignore);
partial void ShouldForceConnectCompletionType(ref CompletionType completionType);
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")]
private void Shutdown(Socket socket)
{
if (socket != null)
{
OnShutdown(socket);
try { socket.Shutdown(SocketShutdown.Both); } catch { }
#if !NETSTANDARD1_5
try { socket.Close(); } catch { }
#endif
try { socket.Dispose(); } catch { }
}
}
private void WriteAllQueues()
{
while (true)
{
PhysicalBridge bridge;
lock (writeQueue)
{
if (writeQueue.Count == 0)
{
if (isDisposed) break; // <========= exit point
Monitor.Wait(writeQueue);
if (isDisposed) break; // (woken by Dispose)
if (writeQueue.Count == 0) continue; // still nothing...
}
bridge = writeQueue.Dequeue();
}
switch (bridge.WriteQueue(200))
{
case WriteResult.MoreWork:
case WriteResult.QueueEmptyAfterWrite:
// back of the line!
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
}
break;
case WriteResult.CompetingWriter:
break;
case WriteResult.NoConnection:
Interlocked.Exchange(ref bridge.inWriteQueue, 0);
break;
case WriteResult.NothingToDo:
if (!bridge.ConfirmRemoveFromWriteQueue())
{ // more snuck in; back of the line!
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
}
}
break;
}
}
}
private void WriteOneQueue()
{
PhysicalBridge bridge;
lock (writeQueue)
{
bridge = writeQueue.Count == 0 ? null : writeQueue.Dequeue();
}
if (bridge == null) return;
bool keepGoing;
do
{
switch (bridge.WriteQueue(-1))
{
case WriteResult.MoreWork:
case WriteResult.QueueEmptyAfterWrite:
keepGoing = true;
break;
case WriteResult.NothingToDo:
keepGoing = !bridge.ConfirmRemoveFromWriteQueue();
break;
case WriteResult.CompetingWriter:
keepGoing = false;
break;
case WriteResult.NoConnection:
Interlocked.Exchange(ref bridge.inWriteQueue, 0);
keepGoing = false;
break;
default:
keepGoing = false;
break;
}
} while (keepGoing);
}
}
}
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
#if NETSTANDARD1_5
using System.Runtime.InteropServices;
using System.Threading.Tasks;
#endif
namespace StackExchange.Redis
{
internal enum SocketMode
{
Abort,
Poll,
Async
}
/// <summary>
/// Allows callbacks from SocketManager as work is discovered
/// </summary>
internal partial interface ISocketCallback
{
/// <summary>
/// Indicates that a socket has connected
/// </summary>
/// <param name="stream">The network stream for this socket.</param>
/// <param name="log">A text logger to write to.</param>
SocketMode Connected(Stream stream, TextWriter log);
/// <summary>
/// Indicates that the socket has signalled an error condition
/// </summary>
void Error();
void OnHeartbeat();
/// <summary>
/// Indicates that data is available on the socket, and that the consumer should read synchronously from the socket while there is data
/// </summary>
void Read();
/// <summary>
/// Indicates that we cannot know whether data is available, and that the consume should commence reading asynchronously
/// </summary>
void StartReading();
// check for write-read timeout
void CheckForStaleConnection(ref SocketManager.ManagerState state);
bool IsDataAvailable { get; }
}
internal struct SocketToken
{
internal readonly Socket Socket;
public SocketToken(Socket socket)
{
Socket = socket;
}
public int Available => Socket?.Available ?? 0;
public bool HasValue => Socket != null;
}
/// <summary>
/// A SocketManager monitors multiple sockets for availability of data; this is done using
/// the Socket.Select API and a dedicated reader-thread, which allows for fast responses
/// even when the system is under ambient load.
/// </summary>
public sealed partial class SocketManager : IDisposable
{
internal enum ManagerState
{
Inactive,
Preparing,
Faulted,
CheckForHeartbeat,
ExecuteHeartbeat,
LocateActiveSockets,
NoSocketsPause,
PrepareActiveSockets,
CullDeadSockets,
NoActiveSocketsPause,
GrowingSocketArray,
CopyingPointersForSelect,
ExecuteSelect,
ExecuteSelectComplete,
CheckForStaleConnections,
RecordConnectionFailed_OnInternalError,
RecordConnectionFailed_OnDisconnected,
RecordConnectionFailed_ReportFailure,
RecordConnectionFailed_OnConnectionFailed,
RecordConnectionFailed_FailOutstanding,
RecordConnectionFailed_ShutdownSocket,
CheckForStaleConnectionsDone,
EnqueueRead,
EnqueueError,
EnqueueReadFallback,
RequestAssistance,
ProcessQueues,
ProcessReadQueue,
ProcessErrorQueue,
}
private static readonly ParameterizedThreadStart writeAllQueues = context =>
{
try { ((SocketManager)context).WriteAllQueues(); } catch { }
};
private static readonly WaitCallback writeOneQueue = context =>
{
try { ((SocketManager)context).WriteOneQueue(); } catch { }
};
private readonly Queue<PhysicalBridge> writeQueue = new Queue<PhysicalBridge>();
private bool isDisposed;
private readonly bool useHighPrioritySocketThreads = true;
/// <summary>
/// Gets the name of this SocketManager instance
/// </summary>
public string Name { get; }
/// <summary>
/// Creates a new (optionally named) <see cref="SocketManager"/> instance
/// </summary>
/// <param name="name">The name for this <see cref="SocketManager"/>.</param>
public SocketManager(string name = null) : this(name, true) { }
/// <summary>
/// Creates a new <see cref="SocketManager"/> instance
/// </summary>
/// <param name="name">The name for this <see cref="SocketManager"/>.</param>
/// <param name="useHighPrioritySocketThreads">Whether this <see cref="SocketManager"/> should use high priority sockets.</param>
public SocketManager(string name, bool useHighPrioritySocketThreads)
{
if (string.IsNullOrWhiteSpace(name)) name = GetType().Name;
Name = name;
this.useHighPrioritySocketThreads = useHighPrioritySocketThreads;
// we need a dedicated writer, because when under heavy ambient load
// (a busy asp.net site, for example), workers are not reliable enough
#if NETSTANDARD1_5
var dedicatedWriter = new Thread(writeAllQueues);
#else
var dedicatedWriter = new Thread(writeAllQueues, 32 * 1024); // don't need a huge stack;
dedicatedWriter.Priority = useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal;
#endif
dedicatedWriter.Name = name + ":Write";
dedicatedWriter.IsBackground = true; // should not keep process alive
dedicatedWriter.Start(this); // will self-exit when disposed
}
private enum CallbackOperation
{
Read,
Error
}
/// <summary>
/// Releases all resources associated with this instance
/// </summary>
public void Dispose()
{
lock (writeQueue)
{
// make sure writer threads know to exit
isDisposed = true;
Monitor.PulseAll(writeQueue);
}
OnDispose();
}
internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, ConnectionMultiplexer multiplexer, TextWriter log)
{
var addressFamily = endpoint.AddressFamily == AddressFamily.Unspecified ? AddressFamily.InterNetwork : endpoint.AddressFamily;
var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp);
SetFastLoopbackOption(socket);
socket.NoDelay = true;
try
{
var connectCompletionType = CompletionType.Any;
ShouldForceConnectCompletionType(ref connectCompletionType);
var formattedEndpoint = Format.ToString(endpoint);
var tuple = Tuple.Create(socket, callback);
if (endpoint is DnsEndPoint dnsEndpoint)
{
// A work-around for a Mono bug in BeginConnect(EndPoint endpoint, AsyncCallback callback, object state)
#if !FEATURE_THREADPOOL
multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
socket.ConnectAsync(dnsEndpoint.Host, dnsEndpoint.Port).ContinueWith(t =>
{
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(t, multiplexer, log, tuple);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
});
#else
CompletionTypeHelper.RunWithCompletionType(
cb => {
multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
return socket.BeginConnect(dnsEndpoint.Host, dnsEndpoint.Port, cb, tuple);
},
ar => {
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(ar, multiplexer, log, tuple);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
},
connectCompletionType);
#endif
}
else
{
#if !FEATURE_THREADPOOL
multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
socket.ConnectAsync(endpoint).ContinueWith(t =>
{
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(t, multiplexer, log, tuple);
});
#else
CompletionTypeHelper.RunWithCompletionType(
cb => {
multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
return socket.BeginConnect(endpoint, cb, tuple);
},
ar => {
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(ar, multiplexer, log, tuple);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
},
connectCompletionType);
#endif
}
}
catch (NotImplementedException ex)
{
if (!(endpoint is IPEndPoint))
{
throw new InvalidOperationException("BeginConnect failed with NotImplementedException; consider using IP endpoints, or enable ResolveDns in the configuration", ex);
}
throw;
}
var token = new SocketToken(socket);
return token;
}
internal void SetFastLoopbackOption(Socket socket)
{
// SIO_LOOPBACK_FAST_PATH (https://msdn.microsoft.com/en-us/library/windows/desktop/jj841212%28v=vs.85%29.aspx)
// Speeds up localhost operations significantly. OK to apply to a socket that will not be hooked up to localhost,
// or will be subject to WFP filtering.
const int SIO_LOOPBACK_FAST_PATH = -1744830448;
#if NETSTANDARD1_5
try
{
// Ioctl is not supported on other platforms at the moment
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
byte[] optionInValue = BitConverter.GetBytes(1);
socket.IOControl(SIO_LOOPBACK_FAST_PATH, optionInValue, null);
}
}
catch (SocketException) { }
catch (PlatformNotSupportedException)
{
// Fix for https://github.com/StackExchange/StackExchange.Redis/issues/582
// Checking the platform can fail on some platforms. However, we don't
// care if the platform check fails because this is for a Windows
// optimization, and checking the platform will not fail on Windows.
}
#else
// windows only
if (Environment.OSVersion.Platform == PlatformID.Win32NT)
{
// Win8/Server2012+ only
var osVersion = Environment.OSVersion.Version;
if (osVersion.Major > 6 || (osVersion.Major == 6 && osVersion.Minor >= 2))
{
byte[] optionInValue = BitConverter.GetBytes(1);
socket.IOControl(SIO_LOOPBACK_FAST_PATH, optionInValue, null);
}
}
#endif
}
internal void RequestWrite(PhysicalBridge bridge, bool forced)
{
if (Interlocked.CompareExchange(ref bridge.inWriteQueue, 1, 0) == 0 || forced)
{
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
if (writeQueue.Count == 1)
{
Monitor.PulseAll(writeQueue);
}
else if (writeQueue.Count >= 2)
{ // struggling are we? let's have some help dealing with the backlog
ThreadPool.QueueUserWorkItem(writeOneQueue, this);
}
}
}
}
internal void Shutdown(SocketToken token)
{
Shutdown(token.Socket);
}
private void EndConnectImpl(IAsyncResult ar, ConnectionMultiplexer multiplexer, TextWriter log, Tuple<Socket, ISocketCallback> tuple)
{
try
{
bool ignoreConnect = false;
ShouldIgnoreConnect(tuple.Item2, ref ignoreConnect);
if (ignoreConnect) return;
var socket = tuple.Item1;
var callback = tuple.Item2;
#if NETSTANDARD1_5
multiplexer.Wait((Task)ar); // make it explode if invalid (note: already complete at this point)
#else
socket.EndConnect(ar);
#endif
var netStream = new NetworkStream(socket, false);
var socketMode = callback?.Connected(netStream, log) ?? SocketMode.Abort;
switch (socketMode)
{
case SocketMode.Poll:
multiplexer.LogLocked(log, "Starting poll");
OnAddRead(socket, callback);
break;
case SocketMode.Async:
multiplexer.LogLocked(log, "Starting read");
try
{ callback.StartReading(); }
catch (Exception ex)
{
ConnectionMultiplexer.TraceWithoutContext(ex.Message);
Shutdown(socket);
}
break;
default:
ConnectionMultiplexer.TraceWithoutContext("Aborting socket");
Shutdown(socket);
break;
}
}
catch (ObjectDisposedException)
{
multiplexer.LogLocked(log, "(socket shutdown)");
if (tuple != null)
{
try
{ tuple.Item2.Error(); }
catch (Exception inner)
{
ConnectionMultiplexer.TraceWithoutContext(inner.Message);
}
}
}
catch(Exception outer)
{
ConnectionMultiplexer.TraceWithoutContext(outer.Message);
if (tuple != null)
{
try
{ tuple.Item2.Error(); }
catch (Exception inner)
{
ConnectionMultiplexer.TraceWithoutContext(inner.Message);
}
}
}
}
partial void OnDispose();
partial void OnShutdown(Socket socket);
partial void ShouldIgnoreConnect(ISocketCallback callback, ref bool ignore);
partial void ShouldForceConnectCompletionType(ref CompletionType completionType);
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")]
private void Shutdown(Socket socket)
{
if (socket != null)
{
OnShutdown(socket);
try { socket.Shutdown(SocketShutdown.Both); } catch { }
#if !NETSTANDARD1_5
try { socket.Close(); } catch { }
#endif
try { socket.Dispose(); } catch { }
}
}
private void WriteAllQueues()
{
while (true)
{
PhysicalBridge bridge;
lock (writeQueue)
{
if (writeQueue.Count == 0)
{
if (isDisposed) break; // <========= exit point
Monitor.Wait(writeQueue);
if (isDisposed) break; // (woken by Dispose)
if (writeQueue.Count == 0) continue; // still nothing...
}
bridge = writeQueue.Dequeue();
}
switch (bridge.WriteQueue(200))
{
case WriteResult.MoreWork:
case WriteResult.QueueEmptyAfterWrite:
// back of the line!
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
}
break;
case WriteResult.CompetingWriter:
break;
case WriteResult.NoConnection:
Interlocked.Exchange(ref bridge.inWriteQueue, 0);
break;
case WriteResult.NothingToDo:
if (!bridge.ConfirmRemoveFromWriteQueue())
{ // more snuck in; back of the line!
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
}
}
break;
}
}
}
private void WriteOneQueue()
{
PhysicalBridge bridge;
lock (writeQueue)
{
bridge = writeQueue.Count == 0 ? null : writeQueue.Dequeue();
}
if (bridge == null) return;
bool keepGoing;
do
{
switch (bridge.WriteQueue(-1))
{
case WriteResult.MoreWork:
case WriteResult.QueueEmptyAfterWrite:
keepGoing = true;
break;
case WriteResult.NothingToDo:
keepGoing = !bridge.ConfirmRemoveFromWriteQueue();
break;
case WriteResult.CompetingWriter:
keepGoing = false;
break;
case WriteResult.NoConnection:
Interlocked.Exchange(ref bridge.inWriteQueue, 0);
keepGoing = false;
break;
default:
keepGoing = false;
break;
}
} while (keepGoing);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment