Commit d0880697 authored by Marc Gravell's avatar Marc Gravell

Simpler SSL connect

parent bc9e4fe2
......@@ -4,6 +4,7 @@
using System.IO;
using System.Net;
using NUnit.Framework;
using System.Linq;
namespace StackExchange.Redis.Tests
{
......@@ -11,10 +12,17 @@ namespace StackExchange.Redis.Tests
public class SSL : TestBase
{
[Test]
[TestCase(6379, null)]
[TestCase(6380, "as if we care")]
public void ConnectToSSLServer(int port, string sslHost)
[TestCase(6379, false, false)]
[TestCase(6380, true, false)]
[TestCase(6380, true, true)]
public void ConnectToSSLServer(int port, bool useSsl, bool specifyHost)
{
string host = null;
const string path = @"D:\RedisSslHost.txt"; // because I choose not to advertise my server here!
if (File.Exists(path)) host = File.ReadLines(path).First();
if (string.IsNullOrWhiteSpace(host)) Assert.Inconclusive("no ssl host specified at: " + path);
var config = new ConfigurationOptions
{
CommandMap = CommandMap.Create( // looks like "config" is disabled
......@@ -24,18 +32,36 @@ public void ConnectToSSLServer(int port, string sslHost)
{ "cluster", null }
}
),
SslHost = sslHost,
EndPoints = { { "sslredis", port} },
EndPoints = { { host, port} },
AllowAdmin = true,
SyncTimeout = Debugger.IsAttached ? int.MaxValue : 5000
};
config.CertificateValidation += (sender, cert, chain, errors) =>
if(useSsl)
{
Console.WriteLine("cert issued to: " + cert.Subject);
return true; // fingers in ears, pretend we don't know this is wrong
};
using (var muxer = ConnectionMultiplexer.Connect(config, Console.Out))
config.UseSsl = useSsl;
if (specifyHost)
{
config.SslHost = host;
}
config.CertificateValidation += (sender, cert, chain, errors) =>
{
Console.WriteLine("errors: " + errors);
Console.WriteLine("cert issued to: " + cert.Subject);
return true; // fingers in ears, pretend we don't know this is wrong
};
}
var configString = config.ToString();
Console.WriteLine("config: " + configString);
var clone = ConfigurationOptions.Parse(configString);
Assert.AreEqual(configString, clone.ToString(), "config string");
using(var log = new StringWriter())
using (var muxer = ConnectionMultiplexer.Connect(config, log))
{
Console.WriteLine("Connect log:");
Console.WriteLine(log);
Console.WriteLine("====");
muxer.ConnectionFailed += OnConnectionFailed;
muxer.InternalError += OnInternalError;
var db = muxer.GetDatabase();
......@@ -66,13 +92,14 @@ public void ConnectToSSLServer(int port, string sslHost)
// perf: sync/multi-threaded
TestConcurrent(db, key, 30, 10);
TestConcurrent(db, key, 30, 20);
TestConcurrent(db, key, 30, 30);
TestConcurrent(db, key, 30, 40);
TestConcurrent(db, key, 30, 50);
//TestConcurrent(db, key, 30, 20);
//TestConcurrent(db, key, 30, 30);
//TestConcurrent(db, key, 30, 40);
//TestConcurrent(db, key, 30, 50);
}
}
private static void TestConcurrent(IDatabase db, RedisKey key, int SyncLoop, int Threads)
{
long value;
......
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Security;
using System.Text;
using System.Threading.Tasks;
namespace StackExchange.Redis
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Security;
using System.Text;
using System.Threading.Tasks;
namespace StackExchange.Redis
{
/// <summary>
/// Specifies the proxy that is being used to communicate to redis
/// </summary>
/// </summary>
public enum Proxy
{
/// <summary>
......@@ -22,25 +22,25 @@ public enum Proxy
/// Communication via <a href="https://github.com/twitter/twemproxy">twemproxy</a>
/// </summary>
Twemproxy
}
/// <summary>
/// The options relevant to a set of redis connections
/// </summary>
public sealed class ConfigurationOptions : ICloneable
{
internal const string DefaultTieBreaker = "__Booksleeve_TieBreak", DefaultConfigurationChannel = "__Booksleeve_MasterChanged";
private const string AllowAdminPrefix = "allowAdmin=", SyncTimeoutPrefix = "syncTimeout=",
ServiceNamePrefix = "serviceName=", ClientNamePrefix = "name=", KeepAlivePrefix = "keepAlive=",
VersionPrefix = "version=", ConnectTimeoutPrefix = "connectTimeout=", PasswordPrefix = "password=",
TieBreakerPrefix = "tiebreaker=", WriteBufferPrefix = "writeBuffer=", SslHostPrefix = "sslHost=",
ConfigChannelPrefix = "configChannel=", AbortOnConnectFailPrefix = "abortConnect=", ResolveDnsPrefix = "resolveDns=",
ChannelPrefixPrefix = "channelPrefix=", ProxyPrefix = "proxy=";
}
/// <summary>
/// The options relevant to a set of redis connections
/// </summary>
public sealed class ConfigurationOptions : ICloneable
{
internal const string DefaultTieBreaker = "__Booksleeve_TieBreak", DefaultConfigurationChannel = "__Booksleeve_MasterChanged";
private const string AllowAdminPrefix = "allowAdmin=", SyncTimeoutPrefix = "syncTimeout=",
ServiceNamePrefix = "serviceName=", ClientNamePrefix = "name=", KeepAlivePrefix = "keepAlive=",
VersionPrefix = "version=", ConnectTimeoutPrefix = "connectTimeout=", PasswordPrefix = "password=",
TieBreakerPrefix = "tiebreaker=", WriteBufferPrefix = "writeBuffer=", UseSslPrefix = "ssl=", SslHostPrefix = "sslHost=",
ConfigChannelPrefix = "configChannel=", AbortOnConnectFailPrefix = "abortConnect=", ResolveDnsPrefix = "resolveDns=",
ChannelPrefixPrefix = "channelPrefix=", ProxyPrefix = "proxy=";
private readonly EndPointCollection endpoints = new EndPointCollection();
private bool? allowAdmin, abortOnConnectFail, resolveDns;
private bool? allowAdmin, abortOnConnectFail, resolveDns, useSsl;
private string clientName, serviceName, password, tieBreaker, sslHost, configChannel;
......@@ -52,42 +52,47 @@ public sealed class ConfigurationOptions : ICloneable
private Proxy? proxy;
/// <summary>
/// A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication; note
/// that this cannot be specified in the configuration-string.
/// </summary>
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1009:DeclareEventHandlersCorrectly")]
/// <summary>
/// A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication; note
/// that this cannot be specified in the configuration-string.
/// </summary>
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1009:DeclareEventHandlersCorrectly")]
public event LocalCertificateSelectionCallback CertificateSelection;
/// <summary>
/// A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party; note
/// that this cannot be specified in the configuration-string.
/// </summary>
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1009:DeclareEventHandlersCorrectly")]
/// <summary>
/// A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party; note
/// that this cannot be specified in the configuration-string.
/// </summary>
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1009:DeclareEventHandlersCorrectly")]
public event RemoteCertificateValidationCallback CertificateValidation;
/// <summary>
/// Gets or sets whether connect/configuration timeouts should be explicitly notified via a TimeoutException
/// </summary>
/// <summary>
/// Gets or sets whether connect/configuration timeouts should be explicitly notified via a TimeoutException
/// </summary>
public bool AbortOnConnectFail { get { return abortOnConnectFail ?? true; } set { abortOnConnectFail = value; } }
/// <summary>
/// Indicates whether admin operations should be allowed
/// </summary>
/// <summary>
/// Indicates whether admin operations should be allowed
/// </summary>
public bool AllowAdmin { get { return allowAdmin.GetValueOrDefault(); } set { allowAdmin = value; } }
/// <summary>
/// Automatically encodes and decodes channels
/// </summary>
public RedisChannel ChannelPrefix { get;set; }
/// <summary>
/// The client name to user for all connections
/// </summary>
/// <summary>
/// Indicates whether the connection should be encrypted
/// </summary>
public bool UseSsl { get { return useSsl.GetValueOrDefault(); } set { useSsl = value; } }
/// <summary>
/// Automatically encodes and decodes channels
/// </summary>
public RedisChannel ChannelPrefix { get;set; }
/// <summary>
/// The client name to user for all connections
/// </summary>
public string ClientName { get { return clientName; } set { clientName = value; } }
/// <summary>
/// The command-map associated with this configuration
/// </summary>
/// <summary>
/// The command-map associated with this configuration
/// </summary>
public CommandMap CommandMap
{
get
......@@ -100,7 +105,7 @@ public CommandMap CommandMap
default:
return CommandMap.Default;
}
}
}
set
{
if (value == null) throw new ArgumentNullException("value");
......@@ -108,360 +113,367 @@ public CommandMap CommandMap
}
}
/// <summary>
/// Channel to use for broadcasting and listening for configuration change notification
/// </summary>
/// <summary>
/// Channel to use for broadcasting and listening for configuration change notification
/// </summary>
public string ConfigurationChannel { get { return configChannel ?? DefaultConfigurationChannel; } set { configChannel = value; } }
/// <summary>
/// Specifies the time in milliseconds that should be allowed for connection
/// </summary>
/// <summary>
/// Specifies the time in milliseconds that should be allowed for connection
/// </summary>
public int ConnectTimeout { get { return connectTimeout ?? SyncTimeout; } set { connectTimeout = value; } }
/// <summary>
/// The server version to assume
/// </summary>
/// <summary>
/// The server version to assume
/// </summary>
public Version DefaultVersion { get { return defaultVersion ?? RedisFeatures.v2_0_0; } set { defaultVersion = value; } }
/// <summary>
/// The endpoints defined for this configuration
/// </summary>
/// <summary>
/// The endpoints defined for this configuration
/// </summary>
public EndPointCollection EndPoints { get { return endpoints; } }
/// <summary>
/// Specifies the time in seconds at which connections should be pinged to ensure validity
/// </summary>
/// <summary>
/// Specifies the time in seconds at which connections should be pinged to ensure validity
/// </summary>
public int KeepAlive { get { return keepAlive.GetValueOrDefault(-1); } set { keepAlive = value; } }
/// <summary>
/// The password to use to authenticate with the server
/// </summary>
/// <summary>
/// The password to use to authenticate with the server
/// </summary>
public string Password { get { return password; } set { password = value; } }
/// <summary>
/// Indicates whether admin operations should be allowed
/// </summary>
/// <summary>
/// Indicates whether admin operations should be allowed
/// </summary>
public Proxy Proxy { get { return proxy.GetValueOrDefault(); } set { proxy = value; } }
/// <summary>
/// Indicates whether endpoints should be resolved via DNS before connecting
/// </summary>
/// <summary>
/// Indicates whether endpoints should be resolved via DNS before connecting
/// </summary>
public bool ResolveDns { get { return resolveDns.GetValueOrDefault(); } set { resolveDns = value; } }
/// <summary>
/// The service name used to resolve a service via sentinel
/// </summary>
/// <summary>
/// The service name used to resolve a service via sentinel
/// </summary>
public string ServiceName { get { return serviceName; } set { serviceName = value; } }
/// <summary>
/// Gets or sets the SocketManager instance to be used with these options; if this is null a per-multiplexer
/// SocketManager is created automatically.
/// </summary>
/// <summary>
/// Gets or sets the SocketManager instance to be used with these options; if this is null a per-multiplexer
/// SocketManager is created automatically.
/// </summary>
public SocketManager SocketManager { get;set; }
/// <summary>
/// The target-host to use when validating SSL certificate; setting a value here enables SSL mode
/// </summary>
public string SslHost { get { return sslHost; } set { sslHost = value; } }
/// <summary>
/// Specifies the time in milliseconds that the system should allow for synchronous operations
/// </summary>
public int SyncTimeout { get { return syncTimeout.GetValueOrDefault(1000); } set { syncTimeout = value; } }
/// <summary>
/// Tie-breaker used to choose between masters (must match the endpoint exactly)
/// </summary>
public string TieBreaker { get { return tieBreaker ?? DefaultTieBreaker; } set { tieBreaker = value; } }
/// <summary>
/// The size of the output buffer to use
/// </summary>
public int WriteBuffer { get { return writeBuffer.GetValueOrDefault(4096); } set { writeBuffer = value; } }
internal LocalCertificateSelectionCallback CertificateSelectionCallback { get { return CertificateSelection; } private set { CertificateSelection = value; } }
// these just rip out the underlying handlers, bypassing the event accessors - needed when creating the SSL stream
internal RemoteCertificateValidationCallback CertificateValidationCallback { get { return CertificateValidation; } private set { CertificateValidation = value; } }
/// <summary>
/// Parse the configuration from a comma-delimited configuration string
/// </summary>
public static ConfigurationOptions Parse(string configuration)
{
var options = new ConfigurationOptions();
options.DoParse(configuration);
return options;
}
/// <summary>
/// Create a copy of the configuration
/// </summary>
public ConfigurationOptions Clone()
{
var options = new ConfigurationOptions
{
clientName = clientName,
serviceName = serviceName,
keepAlive = keepAlive,
syncTimeout = syncTimeout,
allowAdmin = allowAdmin,
defaultVersion = defaultVersion,
connectTimeout = connectTimeout,
password = password,
tieBreaker = tieBreaker,
writeBuffer = writeBuffer,
sslHost = sslHost,
configChannel = configChannel,
abortOnConnectFail = abortOnConnectFail,
resolveDns = resolveDns,
proxy = proxy,
commandMap = commandMap,
CertificateValidationCallback = CertificateValidationCallback,
CertificateSelectionCallback = CertificateSelectionCallback,
ChannelPrefix = ChannelPrefix.Clone(),
SocketManager = SocketManager,
};
foreach (var item in endpoints)
options.endpoints.Add(item);
return options;
}
/// <summary>
/// Returns the effective configuration string for this configuration
/// </summary>
public override string ToString()
{
var sb = new StringBuilder();
foreach (var endpoint in endpoints)
{
Append(sb, Format.ToString(endpoint));
}
Append(sb, ClientNamePrefix, clientName);
Append(sb, ServiceNamePrefix, serviceName);
Append(sb, KeepAlivePrefix, keepAlive);
Append(sb, SyncTimeoutPrefix, syncTimeout);
Append(sb, AllowAdminPrefix, allowAdmin);
Append(sb, VersionPrefix, defaultVersion);
Append(sb, ConnectTimeoutPrefix, connectTimeout);
Append(sb, PasswordPrefix, password);
Append(sb, TieBreakerPrefix, tieBreaker);
Append(sb, WriteBufferPrefix, writeBuffer);
Append(sb, SslHostPrefix, sslHost);
Append(sb, ConfigChannelPrefix, configChannel);
Append(sb, AbortOnConnectFailPrefix, abortOnConnectFail);
Append(sb, ResolveDnsPrefix, resolveDns);
Append(sb, ChannelPrefixPrefix, (string)ChannelPrefix);
Append(sb, ProxyPrefix, proxy);
if(commandMap != null) commandMap.AppendDeltas(sb);
return sb.ToString();
}
internal bool HasDnsEndPoints()
{
foreach (var endpoint in endpoints) if (endpoint is DnsEndPoint) return true;
return false;
}
internal async Task ResolveEndPointsAsync(ConnectionMultiplexer multiplexer, TextWriter log)
{
Dictionary<string, IPAddress> cache = new Dictionary<string, IPAddress>(StringComparer.InvariantCultureIgnoreCase);
for (int i = 0; i < endpoints.Count; i++)
{
var dns = endpoints[i] as DnsEndPoint;
if (dns != null)
{
try
{
IPAddress ip;
if (dns.Host == ".")
{
endpoints[i] = new IPEndPoint(IPAddress.Loopback, dns.Port);
}
else if (cache.TryGetValue(dns.Host, out ip))
{ // use cache
endpoints[i] = new IPEndPoint(ip, dns.Port);
}
else
{
multiplexer.LogLocked(log, "Using DNS to resolve '{0}'...", dns.Host);
var ips = await Dns.GetHostAddressesAsync(dns.Host).ObserveErrors().ForAwait();
if (ips.Length == 1)
{
ip = ips[0];
multiplexer.LogLocked(log, "'{0}' => {1}", dns.Host, ip);
cache[dns.Host] = ip;
endpoints[i] = new IPEndPoint(ip, dns.Port);
}
}
}
catch (Exception ex)
{
multiplexer.OnInternalError(ex);
multiplexer.LogLocked(log, ex.Message);
}
}
}
}
static void Append(StringBuilder sb, object value)
{
if (value == null) return;
string s = Format.ToString(value);
if (!string.IsNullOrWhiteSpace(s))
{
if (sb.Length != 0) sb.Append(',');
sb.Append(s);
}
}
static void Append(StringBuilder sb, string prefix, object value)
{
if (value == null) return;
string s = value.ToString();
if (!string.IsNullOrWhiteSpace(s))
{
if (sb.Length != 0) sb.Append(',');
sb.Append(prefix).Append(s);
}
}
static bool IsOption(string option, string prefix)
{
return option.StartsWith(prefix, StringComparison.InvariantCultureIgnoreCase);
}
void Clear()
{
clientName = serviceName = password = tieBreaker = sslHost = configChannel = null;
keepAlive = syncTimeout = connectTimeout = writeBuffer = null;
allowAdmin = abortOnConnectFail = resolveDns = null;
defaultVersion = null;
endpoints.Clear();
commandMap = null;
CertificateSelection = null;
CertificateValidation = null;
ChannelPrefix = default(RedisChannel);
SocketManager = null;
}
object ICloneable.Clone() { return Clone(); }
private void DoParse(string configuration)
{
Clear();
if (!string.IsNullOrWhiteSpace(configuration))
{
// break it down by commas
var arr = configuration.Split(StringSplits.Comma);
Dictionary<string, string> map = null;
foreach (var paddedOption in arr)
{
var option = paddedOption.Trim();
if (string.IsNullOrWhiteSpace(option)) continue;
// check for special tokens
int idx = option.IndexOf('=');
if (idx > 0)
{
var value = option.Substring(idx + 1).Trim();
if (IsOption(option, SyncTimeoutPrefix))
{
int tmp;
if (Format.TryParseInt32(value.Trim(), out tmp) && tmp > 0) SyncTimeout = tmp;
}
else if (IsOption(option, AllowAdminPrefix))
{
bool tmp;
if (Format.TryParseBoolean(value.Trim(), out tmp)) AllowAdmin = tmp;
}
else if (IsOption(option, AbortOnConnectFailPrefix))
{
bool tmp;
if (Format.TryParseBoolean(value.Trim(), out tmp)) AbortOnConnectFail = tmp;
}
else if (IsOption(option, ResolveDnsPrefix))
{
bool tmp;
if (Format.TryParseBoolean(value.Trim(), out tmp)) ResolveDns = tmp;
}
else if (IsOption(option, ServiceNamePrefix))
{
ServiceName = value.Trim();
}
else if (IsOption(option, ClientNamePrefix))
{
ClientName = value.Trim();
}
else if (IsOption(option, ChannelPrefixPrefix))
{
ChannelPrefix = value.Trim();
}
else if (IsOption(option, ConfigChannelPrefix))
{
ConfigurationChannel = value.Trim();
}
else if (IsOption(option, KeepAlivePrefix))
{
int tmp;
if (Format.TryParseInt32(value.Trim(), out tmp)) KeepAlive = tmp;
}
else if (IsOption(option, ConnectTimeoutPrefix))
{
int tmp;
if (Format.TryParseInt32(value.Trim(), out tmp)) ConnectTimeout = tmp;
}
else if (IsOption(option, VersionPrefix))
{
Version tmp;
if (Version.TryParse(value.Trim(), out tmp)) DefaultVersion = tmp;
}
else if (IsOption(option, PasswordPrefix))
{
Password = value.Trim();
}
else if (IsOption(option, TieBreakerPrefix))
{
TieBreaker = value.Trim();
}
else if (IsOption(option, SslHostPrefix))
{
SslHost = value.Trim();
}
else if (IsOption(option, WriteBufferPrefix))
{
int tmp;
if (Format.TryParseInt32(value.Trim(), out tmp)) WriteBuffer = tmp;
/// <summary>
/// The target-host to use when validating SSL certificate; setting a value here enables SSL mode
/// </summary>
public string SslHost { get { return sslHost; } set { sslHost = value; } }
/// <summary>
/// Specifies the time in milliseconds that the system should allow for synchronous operations
/// </summary>
public int SyncTimeout { get { return syncTimeout.GetValueOrDefault(1000); } set { syncTimeout = value; } }
/// <summary>
/// Tie-breaker used to choose between masters (must match the endpoint exactly)
/// </summary>
public string TieBreaker { get { return tieBreaker ?? DefaultTieBreaker; } set { tieBreaker = value; } }
/// <summary>
/// The size of the output buffer to use
/// </summary>
public int WriteBuffer { get { return writeBuffer.GetValueOrDefault(4096); } set { writeBuffer = value; } }
internal LocalCertificateSelectionCallback CertificateSelectionCallback { get { return CertificateSelection; } private set { CertificateSelection = value; } }
// these just rip out the underlying handlers, bypassing the event accessors - needed when creating the SSL stream
internal RemoteCertificateValidationCallback CertificateValidationCallback { get { return CertificateValidation; } private set { CertificateValidation = value; } }
/// <summary>
/// Parse the configuration from a comma-delimited configuration string
/// </summary>
public static ConfigurationOptions Parse(string configuration)
{
var options = new ConfigurationOptions();
options.DoParse(configuration);
return options;
}
/// <summary>
/// Create a copy of the configuration
/// </summary>
public ConfigurationOptions Clone()
{
var options = new ConfigurationOptions
{
clientName = clientName,
serviceName = serviceName,
keepAlive = keepAlive,
syncTimeout = syncTimeout,
allowAdmin = allowAdmin,
defaultVersion = defaultVersion,
connectTimeout = connectTimeout,
password = password,
tieBreaker = tieBreaker,
writeBuffer = writeBuffer,
useSsl = useSsl,
sslHost = sslHost,
configChannel = configChannel,
abortOnConnectFail = abortOnConnectFail,
resolveDns = resolveDns,
proxy = proxy,
commandMap = commandMap,
CertificateValidationCallback = CertificateValidationCallback,
CertificateSelectionCallback = CertificateSelectionCallback,
ChannelPrefix = ChannelPrefix.Clone(),
SocketManager = SocketManager,
};
foreach (var item in endpoints)
options.endpoints.Add(item);
return options;
}
/// <summary>
/// Returns the effective configuration string for this configuration
/// </summary>
public override string ToString()
{
var sb = new StringBuilder();
foreach (var endpoint in endpoints)
{
Append(sb, Format.ToString(endpoint));
}
Append(sb, ClientNamePrefix, clientName);
Append(sb, ServiceNamePrefix, serviceName);
Append(sb, KeepAlivePrefix, keepAlive);
Append(sb, SyncTimeoutPrefix, syncTimeout);
Append(sb, AllowAdminPrefix, allowAdmin);
Append(sb, VersionPrefix, defaultVersion);
Append(sb, ConnectTimeoutPrefix, connectTimeout);
Append(sb, PasswordPrefix, password);
Append(sb, TieBreakerPrefix, tieBreaker);
Append(sb, WriteBufferPrefix, writeBuffer);
Append(sb, UseSslPrefix, useSsl);
Append(sb, SslHostPrefix, sslHost);
Append(sb, ConfigChannelPrefix, configChannel);
Append(sb, AbortOnConnectFailPrefix, abortOnConnectFail);
Append(sb, ResolveDnsPrefix, resolveDns);
Append(sb, ChannelPrefixPrefix, (string)ChannelPrefix);
Append(sb, ProxyPrefix, proxy);
if(commandMap != null) commandMap.AppendDeltas(sb);
return sb.ToString();
}
internal bool HasDnsEndPoints()
{
foreach (var endpoint in endpoints) if (endpoint is DnsEndPoint) return true;
return false;
}
internal async Task ResolveEndPointsAsync(ConnectionMultiplexer multiplexer, TextWriter log)
{
Dictionary<string, IPAddress> cache = new Dictionary<string, IPAddress>(StringComparer.InvariantCultureIgnoreCase);
for (int i = 0; i < endpoints.Count; i++)
{
var dns = endpoints[i] as DnsEndPoint;
if (dns != null)
{
try
{
IPAddress ip;
if (dns.Host == ".")
{
endpoints[i] = new IPEndPoint(IPAddress.Loopback, dns.Port);
}
else if (cache.TryGetValue(dns.Host, out ip))
{ // use cache
endpoints[i] = new IPEndPoint(ip, dns.Port);
}
else
{
multiplexer.LogLocked(log, "Using DNS to resolve '{0}'...", dns.Host);
var ips = await Dns.GetHostAddressesAsync(dns.Host).ObserveErrors().ForAwait();
if (ips.Length == 1)
{
ip = ips[0];
multiplexer.LogLocked(log, "'{0}' => {1}", dns.Host, ip);
cache[dns.Host] = ip;
endpoints[i] = new IPEndPoint(ip, dns.Port);
}
}
}
catch (Exception ex)
{
multiplexer.OnInternalError(ex);
multiplexer.LogLocked(log, ex.Message);
}
}
}
}
static void Append(StringBuilder sb, object value)
{
if (value == null) return;
string s = Format.ToString(value);
if (!string.IsNullOrWhiteSpace(s))
{
if (sb.Length != 0) sb.Append(',');
sb.Append(s);
}
}
static void Append(StringBuilder sb, string prefix, object value)
{
if (value == null) return;
string s = value.ToString();
if (!string.IsNullOrWhiteSpace(s))
{
if (sb.Length != 0) sb.Append(',');
sb.Append(prefix).Append(s);
}
}
static bool IsOption(string option, string prefix)
{
return option.StartsWith(prefix, StringComparison.InvariantCultureIgnoreCase);
}
void Clear()
{
clientName = serviceName = password = tieBreaker = sslHost = configChannel = null;
keepAlive = syncTimeout = connectTimeout = writeBuffer = null;
allowAdmin = abortOnConnectFail = resolveDns = useSsl = null;
defaultVersion = null;
endpoints.Clear();
commandMap = null;
CertificateSelection = null;
CertificateValidation = null;
ChannelPrefix = default(RedisChannel);
SocketManager = null;
}
object ICloneable.Clone() { return Clone(); }
private void DoParse(string configuration)
{
Clear();
if (!string.IsNullOrWhiteSpace(configuration))
{
// break it down by commas
var arr = configuration.Split(StringSplits.Comma);
Dictionary<string, string> map = null;
foreach (var paddedOption in arr)
{
var option = paddedOption.Trim();
if (string.IsNullOrWhiteSpace(option)) continue;
// check for special tokens
int idx = option.IndexOf('=');
if (idx > 0)
{
var value = option.Substring(idx + 1).Trim();
if (IsOption(option, SyncTimeoutPrefix))
{
int tmp;
if (Format.TryParseInt32(value.Trim(), out tmp) && tmp > 0) SyncTimeout = tmp;
}
else if (IsOption(option, AllowAdminPrefix))
{
bool tmp;
if (Format.TryParseBoolean(value.Trim(), out tmp)) AllowAdmin = tmp;
}
else if (IsOption(option, AbortOnConnectFailPrefix))
{
bool tmp;
if (Format.TryParseBoolean(value.Trim(), out tmp)) AbortOnConnectFail = tmp;
}
else if (IsOption(option, ResolveDnsPrefix))
{
bool tmp;
if (Format.TryParseBoolean(value.Trim(), out tmp)) ResolveDns = tmp;
}
else if (IsOption(option, ServiceNamePrefix))
{
ServiceName = value.Trim();
}
else if (IsOption(option, ClientNamePrefix))
{
ClientName = value.Trim();
}
else if (IsOption(option, ChannelPrefixPrefix))
{
ChannelPrefix = value.Trim();
}
else if (IsOption(option, ConfigChannelPrefix))
{
ConfigurationChannel = value.Trim();
}
else if (IsOption(option, KeepAlivePrefix))
{
int tmp;
if (Format.TryParseInt32(value.Trim(), out tmp)) KeepAlive = tmp;
}
else if (IsOption(option, ConnectTimeoutPrefix))
{
int tmp;
if (Format.TryParseInt32(value.Trim(), out tmp)) ConnectTimeout = tmp;
}
else if (IsOption(option, VersionPrefix))
{
Version tmp;
if (Version.TryParse(value.Trim(), out tmp)) DefaultVersion = tmp;
}
else if (IsOption(option, PasswordPrefix))
{
Password = value.Trim();
}
else if (IsOption(option, TieBreakerPrefix))
{
TieBreaker = value.Trim();
}
else if (IsOption(option, UseSslPrefix))
{
bool tmp;
if (Format.TryParseBoolean(value.Trim(), out tmp)) UseSsl = tmp;
}
else if (IsOption(option, SslHostPrefix))
{
SslHost = value.Trim();
}
else if (IsOption(option, WriteBufferPrefix))
{
int tmp;
if (Format.TryParseInt32(value.Trim(), out tmp)) WriteBuffer = tmp;
} else if(IsOption(option, ProxyPrefix))
{
Proxy tmp;
if (Enum.TryParse(option, true, out tmp)) Proxy = tmp;
}
else if(option[0]=='$')
{
RedisCommand cmd;
option = option.Substring(1, idx-1);
if (Enum.TryParse(option, true, out cmd))
{
if (map == null) map = new Dictionary<string, string>(StringComparer.InvariantCultureIgnoreCase);
map[option] = value;
}
}
else
{
ConnectionMultiplexer.TraceWithoutContext("Unknown configuration option:" + option);
}
}
else
{
var ep = Format.TryParseEndPoint(option);
if (ep != null && !endpoints.Contains(ep)) endpoints.Add(ep);
}
}
}
else if(option[0]=='$')
{
RedisCommand cmd;
option = option.Substring(1, idx-1);
if (Enum.TryParse(option, true, out cmd))
{
if (map == null) map = new Dictionary<string, string>(StringComparer.InvariantCultureIgnoreCase);
map[option] = value;
}
}
else
{
ConnectionMultiplexer.TraceWithoutContext("Unknown configuration option:" + option);
}
}
else
{
var ep = Format.TryParseEndPoint(option);
if (ep != null && !endpoints.Contains(ep)) endpoints.Add(ep);
}
}
if (map != null && map.Count != 0)
{
this.CommandMap = CommandMap.Create(map);
}
}
}
}
}
}
}
}
}
}
......@@ -79,6 +79,20 @@ internal static string ToString(EndPoint endpoint)
return dns.Host + ":" + Format.ToString(dns.Port);
}
}
internal static string ToStringHostOnly(EndPoint endpoint)
{
var dns = endpoint as DnsEndPoint;
if (dns != null)
{
return dns.Host;
}
var ip = endpoint as IPEndPoint;
if(ip != null)
{
return ip.Address.ToString();
}
return "";
}
internal static bool TryGetHostPort(EndPoint endpoint, out string host, out int port)
{
......
......@@ -522,14 +522,17 @@ SocketMode ISocketCallback.Connected(Stream stream)
// [network]<==[ssl]<==[logging]<==[buffered]
var config = multiplexer.RawConfig;
if (!string.IsNullOrWhiteSpace(config.SslHost))
if(config.UseSsl)
{
var host = config.SslHost;
if (string.IsNullOrWhiteSpace(host)) host = Format.ToStringHostOnly(bridge.ServerEndPoint.EndPoint);
var ssl = new SslStream(stream, false, config.CertificateValidationCallback, config.CertificateSelectionCallback
#if !MONO
, EncryptionPolicy.RequireEncryption
#endif
);
ssl.AuthenticateAsClient(config.SslHost);
ssl.AuthenticateAsClient(host);
if (!ssl.IsEncrypted)
{
RecordConnectionFailed(ConnectionFailureType.AuthenticationFailure);
......
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Threading;
namespace StackExchange.Redis
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Threading;
namespace StackExchange.Redis
{
internal enum SocketMode
{
Abort,
Poll,
Async
internal enum SocketMode
{
Abort,
Poll,
Async
}
/// <summary>
/// Allows callbacks from SocketManager as work is discovered
/// </summary>
internal interface ISocketCallback
{
/// <summary>
/// Indicates that a socket has connected
/// </summary>
SocketMode Connected(Stream stream);
/// <summary>
/// Indicates that the socket has signalled an error condition
/// </summary>
/// <summary>
/// Allows callbacks from SocketManager as work is discovered
/// </summary>
internal interface ISocketCallback
{
/// <summary>
/// Indicates that a socket has connected
/// </summary>
SocketMode Connected(Stream stream);
/// <summary>
/// Indicates that the socket has signalled an error condition
/// </summary>
void Error();
void OnHeartbeat();
/// <summary>
/// Indicates that data is available on the socket, and that the consumer should read synchronously from the socket while there is data
/// </summary>
void Read();
/// <summary>
/// Indicates that we cannot know whether data is available, and that the consume should commence reading asynchronously
/// </summary>
void StartReading();
/// <summary>
/// Indicates that data is available on the socket, and that the consumer should read synchronously from the socket while there is data
/// </summary>
void Read();
/// <summary>
/// Indicates that we cannot know whether data is available, and that the consume should commence reading asynchronously
/// </summary>
void StartReading();
}
internal struct SocketToken
{
internal readonly Socket Socket;
public SocketToken(Socket socket)
{
this.Socket = socket;
}
public int Available { get { return Socket == null ? 0 : Socket.Available; } }
public bool HasValue { get { return Socket != null; } }
internal struct SocketToken
{
internal readonly Socket Socket;
public SocketToken(Socket socket)
{
this.Socket = socket;
}
public int Available { get { return Socket == null ? 0 : Socket.Available; } }
public bool HasValue { get { return Socket != null; } }
}
/// <summary>
/// A SocketManager monitors multiple sockets for availability of data; this is done using
/// the Socket.Select API and a dedicated reader-thread, which allows for fast responses
/// even when the system is under ambient load.
/// </summary>
public sealed partial class SocketManager : IDisposable
/// <summary>
/// A SocketManager monitors multiple sockets for availability of data; this is done using
/// the Socket.Select API and a dedicated reader-thread, which allows for fast responses
/// even when the system is under ambient load.
/// </summary>
public sealed partial class SocketManager : IDisposable
{
private static readonly ParameterizedThreadStart writeAllQueues = context =>
{
try { ((SocketManager)context).WriteAllQueues(); } catch { }
private static readonly ParameterizedThreadStart writeAllQueues = context =>
{
try { ((SocketManager)context).WriteAllQueues(); } catch { }
};
private static readonly WaitCallback writeOneQueue = context =>
{
try { ((SocketManager)context).WriteOneQueue(); } catch { }
private static readonly WaitCallback writeOneQueue = context =>
{
try { ((SocketManager)context).WriteOneQueue(); } catch { }
};
private readonly string name;
......@@ -78,211 +78,211 @@ public sealed partial class SocketManager : IDisposable
bool isDisposed;
/// <summary>
/// Creates a new (optionally named) SocketManager instance
/// </summary>
public SocketManager(string name = null)
{
if (string.IsNullOrWhiteSpace(name)) name = GetType().Name;
this.name = name;
// we need a dedicated writer, because when under heavy ambient load
// (a busy asp.net site, for example), workers are not reliable enough
Thread dedicatedWriter = new Thread(writeAllQueues, 32 * 1024); // don't need a huge stack;
dedicatedWriter.Priority = ThreadPriority.AboveNormal; // time critical
dedicatedWriter.Name = name + ":Write";
dedicatedWriter.IsBackground = true; // should not keep process alive
dedicatedWriter.Start(this); // will self-exit when disposed
/// <summary>
/// Creates a new (optionally named) SocketManager instance
/// </summary>
public SocketManager(string name = null)
{
if (string.IsNullOrWhiteSpace(name)) name = GetType().Name;
this.name = name;
// we need a dedicated writer, because when under heavy ambient load
// (a busy asp.net site, for example), workers are not reliable enough
Thread dedicatedWriter = new Thread(writeAllQueues, 32 * 1024); // don't need a huge stack;
dedicatedWriter.Priority = ThreadPriority.AboveNormal; // time critical
dedicatedWriter.Name = name + ":Write";
dedicatedWriter.IsBackground = true; // should not keep process alive
dedicatedWriter.Start(this); // will self-exit when disposed
}
private enum CallbackOperation
{
Read,
Error
private enum CallbackOperation
{
Read,
Error
}
/// <summary>
/// Gets the name of this SocketManager instance
/// </summary>
/// <summary>
/// Gets the name of this SocketManager instance
/// </summary>
public string Name { get { return name; } }
/// <summary>
/// Releases all resources associated with this instance
/// </summary>
public void Dispose()
{
lock (writeQueue)
{
// make sure writer threads know to exit
isDisposed = true;
Monitor.PulseAll(writeQueue);
/// <summary>
/// Releases all resources associated with this instance
/// </summary>
public void Dispose()
{
lock (writeQueue)
{
// make sure writer threads know to exit
isDisposed = true;
Monitor.PulseAll(writeQueue);
}
OnDispose();
OnDispose();
}
internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback)
{
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.NoDelay = true;
socket.BeginConnect(endpoint, EndConnect, Tuple.Create(socket, callback));
return new SocketToken(socket);
internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback)
{
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.NoDelay = true;
socket.BeginConnect(endpoint, EndConnect, Tuple.Create(socket, callback));
return new SocketToken(socket);
}
internal void RequestWrite(PhysicalBridge bridge, bool forced)
{
if (Interlocked.CompareExchange(ref bridge.inWriteQueue, 1, 0) == 0 || forced)
{
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
if (writeQueue.Count == 1)
{
Monitor.PulseAll(writeQueue);
}
else if (writeQueue.Count >= 2)
{ // struggling are we? let's have some help dealing with the backlog
ThreadPool.QueueUserWorkItem(writeOneQueue, this);
}
}
}
internal void RequestWrite(PhysicalBridge bridge, bool forced)
{
if (Interlocked.CompareExchange(ref bridge.inWriteQueue, 1, 0) == 0 || forced)
{
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
if (writeQueue.Count == 1)
{
Monitor.PulseAll(writeQueue);
}
else if (writeQueue.Count >= 2)
{ // struggling are we? let's have some help dealing with the backlog
ThreadPool.QueueUserWorkItem(writeOneQueue, this);
}
}
}
}
internal void Shutdown(SocketToken token)
{
Shutdown(token.Socket);
internal void Shutdown(SocketToken token)
{
Shutdown(token.Socket);
}
private void EndConnect(IAsyncResult ar)
{
Tuple<Socket, ISocketCallback> tuple = null;
try
{
tuple = (Tuple<Socket, ISocketCallback>)ar.AsyncState;
var socket = tuple.Item1;
var callback = tuple.Item2;
socket.EndConnect(ar);
var netStream = new NetworkStream(socket, false);
var socketMode = callback == null ? SocketMode.Abort : callback.Connected(netStream);
switch (socketMode)
{
case SocketMode.Poll:
OnAddRead(socket, callback);
break;
case SocketMode.Async:
private void EndConnect(IAsyncResult ar)
{
Tuple<Socket, ISocketCallback> tuple = null;
try
{
tuple = (Tuple<Socket, ISocketCallback>)ar.AsyncState;
var socket = tuple.Item1;
var callback = tuple.Item2;
socket.EndConnect(ar);
var netStream = new NetworkStream(socket, false);
var socketMode = callback == null ? SocketMode.Abort : callback.Connected(netStream);
switch (socketMode)
{
case SocketMode.Poll:
OnAddRead(socket, callback);
break;
case SocketMode.Async:
try
{ callback.StartReading(); }
{ callback.StartReading(); }
catch
{ Shutdown(socket); }
break;
default:
Shutdown(socket);
break;
}
}
catch
{
if (tuple != null)
{
{ Shutdown(socket); }
break;
default:
Shutdown(socket);
break;
}
}
catch
{
if (tuple != null)
{
try
{ tuple.Item2.Error(); }
catch (Exception ex)
{
Trace.WriteLine(ex);
}
}
}
{ tuple.Item2.Error(); }
catch (Exception ex)
{
Trace.WriteLine(ex);
}
}
}
}
partial void OnDispose();
partial void OnShutdown(Socket socket);
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")]
private void Shutdown(Socket socket)
{
if (socket != null)
private void Shutdown(Socket socket)
{
if (socket != null)
{
OnShutdown(socket);
try { socket.Shutdown(SocketShutdown.Both); } catch { }
try { socket.Close(); } catch { }
try { socket.Dispose(); } catch { }
}
OnShutdown(socket);
try { socket.Shutdown(SocketShutdown.Both); } catch { }
try { socket.Close(); } catch { }
try { socket.Dispose(); } catch { }
}
}
private void WriteAllQueues()
{
while (true)
{
PhysicalBridge bridge;
lock (writeQueue)
{
if (writeQueue.Count == 0)
{
if (isDisposed) break; // <========= exit point
Monitor.Wait(writeQueue);
if (isDisposed) break; // (woken by Dispose)
if (writeQueue.Count == 0) continue; // still nothing...
}
bridge = writeQueue.Dequeue();
}
switch (bridge.WriteQueue(200))
{
case WriteResult.MoreWork:
case WriteResult.QueueEmptyAfterWrite:
// back of the line!
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
}
break;
case WriteResult.CompetingWriter:
break;
case WriteResult.NoConnection:
Interlocked.Exchange(ref bridge.inWriteQueue, 0);
break;
case WriteResult.NothingToDo:
if (!bridge.ConfirmRemoveFromWriteQueue())
{ // more snuck in; back of the line!
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
}
}
break;
}
}
private void WriteAllQueues()
{
while (true)
{
PhysicalBridge bridge;
lock (writeQueue)
{
if (writeQueue.Count == 0)
{
if (isDisposed) break; // <========= exit point
Monitor.Wait(writeQueue);
if (isDisposed) break; // (woken by Dispose)
if (writeQueue.Count == 0) continue; // still nothing...
}
bridge = writeQueue.Dequeue();
}
switch (bridge.WriteQueue(200))
{
case WriteResult.MoreWork:
case WriteResult.QueueEmptyAfterWrite:
// back of the line!
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
}
break;
case WriteResult.CompetingWriter:
break;
case WriteResult.NoConnection:
Interlocked.Exchange(ref bridge.inWriteQueue, 0);
break;
case WriteResult.NothingToDo:
if (!bridge.ConfirmRemoveFromWriteQueue())
{ // more snuck in; back of the line!
lock (writeQueue)
{
writeQueue.Enqueue(bridge);
}
}
break;
}
}
}
private void WriteOneQueue()
{
PhysicalBridge bridge;
lock (writeQueue)
{
bridge = writeQueue.Count == 0 ? null : writeQueue.Dequeue();
}
if (bridge == null) return;
bool keepGoing;
do
{
switch (bridge.WriteQueue(-1))
{
case WriteResult.MoreWork:
case WriteResult.QueueEmptyAfterWrite:
keepGoing = true;
break;
case WriteResult.NothingToDo:
keepGoing = !bridge.ConfirmRemoveFromWriteQueue();
break;
case WriteResult.CompetingWriter:
keepGoing = false;
break;
case WriteResult.NoConnection:
Interlocked.Exchange(ref bridge.inWriteQueue, 0);
keepGoing = false;
break;
default:
keepGoing = false;
break;
}
} while (keepGoing);
}
}
}
private void WriteOneQueue()
{
PhysicalBridge bridge;
lock (writeQueue)
{
bridge = writeQueue.Count == 0 ? null : writeQueue.Dequeue();
}
if (bridge == null) return;
bool keepGoing;
do
{
switch (bridge.WriteQueue(-1))
{
case WriteResult.MoreWork:
case WriteResult.QueueEmptyAfterWrite:
keepGoing = true;
break;
case WriteResult.NothingToDo:
keepGoing = !bridge.ConfirmRemoveFromWriteQueue();
break;
case WriteResult.CompetingWriter:
keepGoing = false;
break;
case WriteResult.NoConnection:
Interlocked.Exchange(ref bridge.inWriteQueue, 0);
keepGoing = false;
break;
default:
keepGoing = false;
break;
}
} while (keepGoing);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment