Commit 2866adb2 authored by Marc Gravell's avatar Marc Gravell

Merge branch 'mckenzieg1-configurable-socket-mgr-thread-priority'

parents 7b4dc7e1 f6d2e809
......@@ -74,7 +74,7 @@ internal static void Unknown(string key)
internal const string AllowAdmin = "allowAdmin", SyncTimeout = "syncTimeout",
ServiceName = "serviceName", ClientName = "name", KeepAlive = "keepAlive",
Version = "version", ConnectTimeout = "connectTimeout", Password = "password",
TieBreaker = "tiebreaker", WriteBuffer = "writeBuffer", Ssl = "ssl", SslHost = "sslHost",
TieBreaker = "tiebreaker", WriteBuffer = "writeBuffer", Ssl = "ssl", SslHost = "sslHost", HighPrioritySocketThreads = "highPriorityThreads",
ConfigChannel = "configChannel", AbortOnConnectFail = "abortConnect", ResolveDns = "resolveDns",
ChannelPrefix = "channelPrefix", Proxy = "proxy", ConnectRetry = "connectRetry",
ConfigCheckSeconds = "configCheckSeconds", ResponseTimeout = "responseTimeout", DefaultDatabase = "defaultDatabase";
......@@ -83,7 +83,7 @@ internal static void Unknown(string key)
AllowAdmin, SyncTimeout,
ServiceName, ClientName, KeepAlive,
Version, ConnectTimeout, Password,
TieBreaker, WriteBuffer, Ssl, SslHost,
TieBreaker, WriteBuffer, Ssl, SslHost, HighPrioritySocketThreads,
ConfigChannel, AbortOnConnectFail, ResolveDns,
ChannelPrefix, Proxy, ConnectRetry,
ConfigCheckSeconds, DefaultDatabase,
......@@ -103,7 +103,7 @@ public static string TryNormalize(string value)
private readonly EndPointCollection endpoints = new EndPointCollection();
private bool? allowAdmin, abortOnConnectFail, resolveDns, ssl;
private bool? allowAdmin, abortOnConnectFail, highPrioritySocketThreads, resolveDns, ssl;
private string clientName, serviceName, password, tieBreaker, sslHost, configChannel;
......@@ -217,6 +217,11 @@ public CommandMap CommandMap
/// </summary>
public EndPointCollection EndPoints { get { return endpoints; } }
/// <summary>
/// Use ThreadPriority.AboveNormal for SocketManager reader and writer threads (true by default). If false, ThreadPriority.Normal will be used.
/// </summary>
public bool HighPrioritySocketThreads { get { return highPrioritySocketThreads ?? true; } set { highPrioritySocketThreads = value; } }
/// <summary>
/// Specifies the time in seconds at which connections should be pinged to ensure validity
/// </summary>
......@@ -329,6 +334,7 @@ public ConfigurationOptions Clone()
writeBuffer = writeBuffer,
ssl = ssl,
sslHost = sslHost,
highPrioritySocketThreads = highPrioritySocketThreads,
configChannel = configChannel,
abortOnConnectFail = abortOnConnectFail,
resolveDns = resolveDns,
......@@ -379,6 +385,7 @@ public override string ToString()
Append(sb, OptionKeys.WriteBuffer, writeBuffer);
Append(sb, OptionKeys.Ssl, ssl);
Append(sb, OptionKeys.SslHost, sslHost);
Append(sb, OptionKeys.HighPrioritySocketThreads, highPrioritySocketThreads);
Append(sb, OptionKeys.ConfigChannel, configChannel);
Append(sb, OptionKeys.AbortOnConnectFail, abortOnConnectFail);
Append(sb, OptionKeys.ResolveDns, resolveDns);
......@@ -481,7 +488,7 @@ void Clear()
{
clientName = serviceName = password = tieBreaker = sslHost = configChannel = null;
keepAlive = syncTimeout = connectTimeout = writeBuffer = connectRetry = configCheckSeconds = defaultDatabase = null;
allowAdmin = abortOnConnectFail = resolveDns = ssl = null;
allowAdmin = abortOnConnectFail = highPrioritySocketThreads = resolveDns = ssl = null;
defaultVersion = null;
endpoints.Clear();
commandMap = null;
......@@ -579,6 +586,9 @@ private void DoParse(string configuration, bool ignoreUnknown)
case OptionKeys.SslHost:
SslHost = value;
break;
case OptionKeys.HighPrioritySocketThreads:
HighPrioritySocketThreads = OptionKeys.ParseBoolean(key, value);
break;
case OptionKeys.WriteBuffer:
WriteBuffer = OptionKeys.ParseInt32(key, value);
break;
......
......@@ -10,7 +10,7 @@ partial class ConnectionMultiplexer
partial void OnCreateReaderWriter(ConfigurationOptions configuration)
{
this.ownsSocketManager = configuration.SocketManager == null;
this.socketManager = configuration.SocketManager ?? new SocketManager(ClientName);
this.socketManager = configuration.SocketManager ?? new SocketManager(ClientName, configuration.HighPrioritySocketThreads);
}
partial void OnCloseReaderWriter()
......
......@@ -377,7 +377,7 @@ private void ReadImpl()
private void StartReader()
{
var thread = new Thread(read, 32 * 1024); // don't need a huge stack
thread.Priority = ThreadPriority.AboveNormal; // time critical
thread.Priority = useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal;
thread.Name = name + ":Read";
thread.IsBackground = true;
thread.Start(this);
......
......@@ -116,20 +116,27 @@ internal enum ManagerState
private readonly Queue<PhysicalBridge> writeQueue = new Queue<PhysicalBridge>();
bool isDisposed;
private bool useHighPrioritySocketThreads = true;
/// <summary>
/// Creates a new (optionally named) SocketManager instance
/// </summary>
public SocketManager(string name = null)
public SocketManager(string name = null) : this(name, true) { }
/// <summary>
/// Creates a new SocketManager instance
/// </summary>
public SocketManager(string name, bool useHighPrioritySocketThreads)
{
if (string.IsNullOrWhiteSpace(name)) name = GetType().Name;
this.name = name;
this.useHighPrioritySocketThreads = useHighPrioritySocketThreads;
// we need a dedicated writer, because when under heavy ambient load
// (a busy asp.net site, for example), workers are not reliable enough
#if !CORE_CLR
Thread dedicatedWriter = new Thread(writeAllQueues, 32 * 1024); // don't need a huge stack;
dedicatedWriter.Priority = ThreadPriority.AboveNormal; // time critical
dedicatedWriter.Priority = useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal;
#else
Thread dedicatedWriter = new Thread(writeAllQueues);
#endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment