Commit 66b8773f authored by Marc Gravell's avatar Marc Gravell

Improved heartbeat code; introduced dedicated reader (can be pooled via...

Improved heartbeat code; introduced dedicated reader (can be pooled via SocketManager); tidied solition directory
parent bb74cc18
@..\packages\Redis-64.2.6.12.1\tools\redis-cli.exe -h cluster -p 7000
\ No newline at end of file
@..\packages\Redis-64.2.6.12.1\tools\redis-cli.exe -h cluster -p 7001
\ No newline at end of file
@..\packages\Redis-64.2.6.12.1\tools\redis-cli.exe -h cluster -p 7002
\ No newline at end of file
@..\packages\Redis-64.2.6.12.1\tools\redis-cli.exe -h cluster -p 7003
\ No newline at end of file
@..\packages\Redis-64.2.6.12.1\tools\redis-cli.exe -h cluster -p 7004
\ No newline at end of file
@..\packages\Redis-64.2.6.12.1\tools\redis-cli.exe -h cluster -p 7005
\ No newline at end of file
@..\packages\Redis-64.2.6.12.1\tools\redis-cli.exe -p 6379
\ No newline at end of file
@..\packages\Redis-64.2.6.12.1\tools\redis-cli.exe -p 6381
\ No newline at end of file
@..\packages\Redis-64.2.6.12.1\tools\redis-cli.exe -p 6380
\ No newline at end of file
@..\packages\Redis-64.2.6.12.1\tools\redis-server.exe master.conf
\ No newline at end of file
@..\packages\Redis-64.2.6.12.1\tools\redis-server.exe secure.conf
\ No newline at end of file
@..\packages\Redis-64.2.6.12.1\tools\redis-server.exe slave.conf
\ No newline at end of file
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
namespace StackExchange.Redis.Tests namespace StackExchange.Redis.Tests
{ {
[TestFixture] [TestFixture]
public class ConnectTests : TestBase public class BasicOpsTests : TestBase
{ {
[Test] [Test]
[TestCase(true)] [TestCase(true)]
......
...@@ -67,7 +67,7 @@ public void TalkToNonsenseServer() ...@@ -67,7 +67,7 @@ public void TalkToNonsenseServer()
[Test] [Test]
public void TestManaulHeartbeat() public void TestManaulHeartbeat()
{ {
using (var muxer = Create(keepAlive: 2000)) using (var muxer = Create(keepAlive: 2))
{ {
var conn = muxer.GetDatabase(); var conn = muxer.GetDatabase();
conn.Ping(); conn.Ping();
...@@ -79,7 +79,7 @@ public void TestManaulHeartbeat() ...@@ -79,7 +79,7 @@ public void TestManaulHeartbeat()
var after = muxer.OperationCount; var after = muxer.OperationCount;
Assert.AreEqual(before + 2, after); Assert.IsTrue(after >= before + 4);
} }
} }
...@@ -296,7 +296,7 @@ public void TestAutomaticHeartbeat() ...@@ -296,7 +296,7 @@ public void TestAutomaticHeartbeat()
Thread.Sleep(TimeSpan.FromSeconds(8)); Thread.Sleep(TimeSpan.FromSeconds(8));
var after = innerMuxer.OperationCount; var after = innerMuxer.OperationCount;
Assert.AreEqual(before + 2, after); Assert.IsTrue(after >= before + 4);
} }
} }
......
using System; using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NUnit.Framework; using NUnit.Framework;
namespace StackExchange.Redis.Tests namespace StackExchange.Redis.Tests
......
...@@ -106,6 +106,7 @@ static bool IgnoreMethodConventions(MethodInfo method) ...@@ -106,6 +106,7 @@ static bool IgnoreMethodConventions(MethodInfo method)
case "CreateTransaction": case "CreateTransaction":
case "IsConnected": case "IsConnected":
case "SetScan": case "SetScan":
case "SubscribedEndpoint":
return true; return true;
} }
return false; return false;
......
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using NUnit.Framework; using NUnit.Framework;
...@@ -229,52 +230,98 @@ public void TestPatternPubSub(bool preserveOrder) ...@@ -229,52 +230,98 @@ public void TestPatternPubSub(bool preserveOrder)
} }
[Test] [Test]
public void SubscriptionsSurviceMasterSwitch() [TestCase(false)]
[TestCase(true)]
public void SubscriptionsSurviveMasterSwitch(bool useSharedSocketManager)
{ {
using (var a = Create(allowAdmin: true)) using (var a = Create(allowAdmin: true, useSharedSocketManager: useSharedSocketManager))
using (var b = Create(allowAdmin: true)) using (var b = Create(allowAdmin: true, useSharedSocketManager: useSharedSocketManager))
{ {
RedisChannel channel = Me(); RedisChannel channel = Me();
var subA = a.GetSubscriber(); var subA = a.GetSubscriber();
var subB = b.GetSubscriber(); var subB = b.GetSubscriber();
long masterChanged = 0, aCount = 0, bCount = 0; long masterChanged = 0, aCount = 0, bCount = 0;
a.MasterChanged += delegate { Interlocked.Increment(ref masterChanged); }; a.ConfigurationChangedBroadcast += delegate {
subA.Subscribe(channel, delegate { Interlocked.Increment(ref aCount); }); Console.WriteLine("a noticed config broadcast: " + Interlocked.Increment(ref masterChanged));
subB.Subscribe(channel, delegate { Interlocked.Increment(ref bCount); }); };
b.ConfigurationChangedBroadcast += delegate {
//var epA = subA.IdentifyEndpoint(channel); Console.WriteLine("b noticed config broadcast: " + Interlocked.Increment(ref masterChanged));
//var epB = subB.IdentifyEndpoint(channel); };
//Console.WriteLine(epA); subA.Subscribe(channel, (ch, message) => {
//Console.WriteLine(epB); Console.WriteLine("a got message: " + message);
subA.Publish(channel, "a"); Interlocked.Increment(ref aCount);
subB.Publish(channel, "b"); });
subB.Subscribe(channel, (ch, message) => {
Console.WriteLine("b got message: " + message);
Interlocked.Increment(ref bCount);
});
Assert.IsFalse(a.GetServer(PrimaryServer, PrimaryPort).IsSlave, PrimaryPortString + " is master via a");
Assert.IsTrue(a.GetServer(PrimaryServer, SlavePort).IsSlave, SlavePortString + " is slave via a");
Assert.IsFalse(b.GetServer(PrimaryServer, PrimaryPort).IsSlave, PrimaryPortString + " is master via b");
Assert.IsTrue(b.GetServer(PrimaryServer, SlavePort).IsSlave, SlavePortString + " is slave via b");
var epA = subA.SubscribedEndpoint(channel);
var epB = subB.SubscribedEndpoint(channel);
Console.WriteLine("a: " + EndPointCollection.ToString(epA));
Console.WriteLine("b: " + EndPointCollection.ToString(epB));
subA.Publish(channel, "a1");
subB.Publish(channel, "b1");
subA.Ping(); subA.Ping();
subB.Ping(); subB.Ping();
Assert.AreEqual(0, Interlocked.Read(ref masterChanged), "master");
Assert.AreEqual(2, Interlocked.Read(ref aCount), "a"); Assert.AreEqual(2, Interlocked.Read(ref aCount), "a");
Assert.AreEqual(2, Interlocked.Read(ref bCount), "b"); Assert.AreEqual(2, Interlocked.Read(ref bCount), "b");
Assert.AreEqual(0, Interlocked.Read(ref masterChanged), "master");
try try
{ {
b.GetServer(PrimaryServer, SlavePort).MakeMaster(ReplicationChangeOptions.All); Interlocked.Exchange(ref masterChanged, 0);
Thread.Sleep(100); Interlocked.Exchange(ref aCount, 0);
//epA = subA.IdentifyEndpoint(channel); Interlocked.Exchange(ref bCount, 0);
//epB = subB.IdentifyEndpoint(channel); Console.WriteLine("Changing master...");
//Console.WriteLine(epA); using (var sw = new StringWriter())
//Console.WriteLine(epB); {
subA.Publish(channel, "a"); a.GetServer(PrimaryServer, SlavePort).MakeMaster(ReplicationChangeOptions.All, sw);
subB.Publish(channel, "b"); Console.WriteLine(sw);
}
subA.Ping();
subB.Ping();
Console.WriteLine("Pausing...");
Thread.Sleep(2000);
Assert.IsTrue(a.GetServer(PrimaryServer, PrimaryPort).IsSlave, PrimaryPortString + " is slave via a");
Assert.IsFalse(a.GetServer(PrimaryServer, SlavePort).IsSlave, SlavePortString + " is master via a");
Assert.IsTrue(b.GetServer(PrimaryServer, PrimaryPort).IsSlave, PrimaryPortString + " is slave via b");
Assert.IsFalse(b.GetServer(PrimaryServer, SlavePort).IsSlave, SlavePortString + " is master via b");
Console.WriteLine("Pause complete");
var counters = a.GetCounters();
Console.WriteLine("a outstanding: " + counters.TotalOutstanding);
counters = b.GetCounters();
Console.WriteLine("b outstanding: " + counters.TotalOutstanding);
subA.Ping(); subA.Ping();
subB.Ping();
epA = subA.SubscribedEndpoint(channel);
epB = subB.SubscribedEndpoint(channel);
Console.WriteLine("a: " + EndPointCollection.ToString(epA));
Console.WriteLine("b: " + EndPointCollection.ToString(epB));
Console.WriteLine("a2 sent to: " + subA.Publish(channel, "a2"));
Console.WriteLine("b2 sent to: " + subB.Publish(channel, "b2"));
subA.Ping(); subA.Ping();
subB.Ping(); subB.Ping();
Assert.AreEqual(2, Interlocked.Read(ref masterChanged), "master"); Console.WriteLine("Checking...");
Assert.AreEqual(4, Interlocked.Read(ref aCount), "a");
Assert.AreEqual(4, Interlocked.Read(ref bCount), "b"); Assert.AreEqual(2, Interlocked.Read(ref aCount), "a");
Assert.AreEqual(2, Interlocked.Read(ref bCount), "b");
Assert.AreEqual(4, Interlocked.CompareExchange(ref masterChanged, 0, 0), "master");
} }
finally finally
{ {
Console.WriteLine("Restoring configuration...");
try try
{ {
a.GetServer(PrimaryServer, PrimaryPort).MakeMaster(ReplicationChangeOptions.All); a.GetServer(PrimaryServer, PrimaryPort).MakeMaster(ReplicationChangeOptions.All);
......
...@@ -13,8 +13,19 @@ ...@@ -13,8 +13,19 @@
namespace StackExchange.Redis.Tests namespace StackExchange.Redis.Tests
{ {
public abstract class TestBase public abstract class TestBase : IDisposable
{ {
private readonly SocketManager socketManager;
protected TestBase()
{
socketManager = new SocketManager(GetType().Name);
}
public void Dispose()
{
socketManager.Dispose();
}
#if VERBOSE #if VERBOSE
protected const int AsyncOpsQty = 100, SyncOpsQty = 10; protected const int AsyncOpsQty = 100, SyncOpsQty = 10;
#else #else
...@@ -85,7 +96,7 @@ public void Teardown() ...@@ -85,7 +96,7 @@ public void Teardown()
} }
protected const int PrimaryPort = 6379, SlavePort = 6380, SecurePort = 6381; protected const int PrimaryPort = 6379, SlavePort = 6380, SecurePort = 6381;
protected const string PrimaryServer = "127.0.0.1", SecurePassword = "changeme", PrimaryPortString = "6379", SecurePortString = "6381"; protected const string PrimaryServer = "127.0.0.1", SecurePassword = "changeme", PrimaryPortString = "6379", SlavePortString = "6380", SecurePortString = "6381";
internal static Task Swallow(Task task) internal static Task Swallow(Task task)
{ {
if (task != null) task.ContinueWith(swallowErrors, TaskContinuationOptions.OnlyOnFaulted); if (task != null) task.ContinueWith(swallowErrors, TaskContinuationOptions.OnlyOnFaulted);
...@@ -115,7 +126,7 @@ protected IServer GetServer(ConnectionMultiplexer muxer) ...@@ -115,7 +126,7 @@ protected IServer GetServer(ConnectionMultiplexer muxer)
string clientName = null, int? syncTimeout = null, bool? allowAdmin = null, int? keepAlive = null, string clientName = null, int? syncTimeout = null, bool? allowAdmin = null, int? keepAlive = null,
int? connectTimeout = null, string password = null, string tieBreaker = null, TextWriter log = null, int? connectTimeout = null, string password = null, string tieBreaker = null, TextWriter log = null,
bool fail = true, string[] disabledCommands = null, bool checkConnect = true, bool pause = true, string failMessage = null, bool fail = true, string[] disabledCommands = null, bool checkConnect = true, bool pause = true, string failMessage = null,
string channelPrefix = null) string channelPrefix = null, bool useSharedSocketManager = true)
{ {
if(pause) Thread.Sleep(500); // get a lot of glitches when hammering new socket creations etc; pace it out a bit if(pause) Thread.Sleep(500); // get a lot of glitches when hammering new socket creations etc; pace it out a bit
string configuration = GetConfiguration(); string configuration = GetConfiguration();
...@@ -127,6 +138,7 @@ protected IServer GetServer(ConnectionMultiplexer muxer) ...@@ -127,6 +138,7 @@ protected IServer GetServer(ConnectionMultiplexer muxer)
map[cmd] = null; map[cmd] = null;
config.CommandMap = CommandMap.Create(map); config.CommandMap = CommandMap.Create(map);
} }
if (useSharedSocketManager) config.SocketManager = socketManager;
if (channelPrefix != null) config.ChannelPrefix = channelPrefix; if (channelPrefix != null) config.ChannelPrefix = channelPrefix;
if (tieBreaker != null) config.TieBreaker = tieBreaker; if (tieBreaker != null) config.TieBreaker = tieBreaker;
if (password != null) config.Password = string.IsNullOrEmpty(password) ? null : password; if (password != null) config.Password = string.IsNullOrEmpty(password) ? null : password;
......
...@@ -12,23 +12,23 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".nuget", ".nuget", "{D3090D ...@@ -12,23 +12,23 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".nuget", ".nuget", "{D3090D
.nuget\packages.config = .nuget\packages.config .nuget\packages.config = .nuget\packages.config
EndProjectSection EndProjectSection
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Redis", "Redis", "{29A8EF11-C420-41F5-B8DC-BB586EF4D827}" Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Redis Configs", "Redis Configs", "{29A8EF11-C420-41F5-B8DC-BB586EF4D827}"
ProjectSection(SolutionItems) = preProject ProjectSection(SolutionItems) = preProject
master.conf = master.conf Redis Configs\master.conf = Redis Configs\master.conf
redis-cli 7000.cmd = redis-cli 7000.cmd Redis Configs\redis-cli 7000.cmd = Redis Configs\redis-cli 7000.cmd
redis-cli 7001.cmd = redis-cli 7001.cmd Redis Configs\redis-cli 7001.cmd = Redis Configs\redis-cli 7001.cmd
redis-cli 7002.cmd = redis-cli 7002.cmd Redis Configs\redis-cli 7002.cmd = Redis Configs\redis-cli 7002.cmd
redis-cli 7003.cmd = redis-cli 7003.cmd Redis Configs\redis-cli 7003.cmd = Redis Configs\redis-cli 7003.cmd
redis-cli 7004.cmd = redis-cli 7004.cmd Redis Configs\redis-cli 7004.cmd = Redis Configs\redis-cli 7004.cmd
redis-cli 7005.cmd = redis-cli 7005.cmd Redis Configs\redis-cli 7005.cmd = Redis Configs\redis-cli 7005.cmd
redis-cli master.cmd = redis-cli master.cmd Redis Configs\redis-cli master.cmd = Redis Configs\redis-cli master.cmd
redis-cli secure.cmd = redis-cli secure.cmd Redis Configs\redis-cli secure.cmd = Redis Configs\redis-cli secure.cmd
redis-cli slave.cmd = redis-cli slave.cmd Redis Configs\redis-cli slave.cmd = Redis Configs\redis-cli slave.cmd
redis-server master.cmd = redis-server master.cmd Redis Configs\redis-server master.cmd = Redis Configs\redis-server master.cmd
redis-server secure.cmd = redis-server secure.cmd Redis Configs\redis-server secure.cmd = Redis Configs\redis-server secure.cmd
redis-server slave.cmd = redis-server slave.cmd Redis Configs\redis-server slave.cmd = Redis Configs\redis-server slave.cmd
secure.conf = secure.conf Redis Configs\secure.conf = Redis Configs\secure.conf
slave.conf = slave.conf Redis Configs\slave.conf = Redis Configs\slave.conf
EndProjectSection EndProjectSection
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{709E0CE4-F0BA-4933-A4FD-4A8B6668A5D4}" Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{709E0CE4-F0BA-4933-A4FD-4A8B6668A5D4}"
......
...@@ -63,7 +63,7 @@ ...@@ -63,7 +63,7 @@
<Reference Include="System.IO.Compression" /> <Reference Include="System.IO.Compression" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Compile Include="StackExchange\RedisChannel.cs" /> <Compile Include="StackExchange\Redis\RedisChannel.cs" />
<Compile Include="StackExchange\Redis\Bitwise.cs" /> <Compile Include="StackExchange\Redis\Bitwise.cs" />
<Compile Include="StackExchange\Redis\ClientFlags.cs" /> <Compile Include="StackExchange\Redis\ClientFlags.cs" />
<Compile Include="StackExchange\Redis\ClientInfo.cs" /> <Compile Include="StackExchange\Redis\ClientInfo.cs" />
...@@ -133,6 +133,7 @@ ...@@ -133,6 +133,7 @@
<Compile Include="StackExchange\Redis\ServerSelectionStrategy.cs" /> <Compile Include="StackExchange\Redis\ServerSelectionStrategy.cs" />
<Compile Include="StackExchange\Redis\ServerType.cs" /> <Compile Include="StackExchange\Redis\ServerType.cs" />
<Compile Include="StackExchange\Redis\SetOperation.cs" /> <Compile Include="StackExchange\Redis\SetOperation.cs" />
<Compile Include="StackExchange\Redis\SocketManager.cs" />
<Compile Include="StackExchange\Redis\StringSplits.cs" /> <Compile Include="StackExchange\Redis\StringSplits.cs" />
<Compile Include="StackExchange\Redis\TaskContinuationCheck.cs" /> <Compile Include="StackExchange\Redis\TaskContinuationCheck.cs" />
<Compile Include="StackExchange\Redis\When.cs" /> <Compile Include="StackExchange\Redis\When.cs" />
......
...@@ -59,6 +59,12 @@ public ConfigurationOptions() ...@@ -59,6 +59,12 @@ public ConfigurationOptions()
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1009:DeclareEventHandlersCorrectly")] [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1009:DeclareEventHandlersCorrectly")]
public event RemoteCertificateValidationCallback CertificateValidation; public event RemoteCertificateValidationCallback CertificateValidation;
/// <summary>
/// Gets or sets the SocketManager instance to be used with these options; if this is null a per-multiplexer
/// SocketManager is created automatically.
/// </summary>
public SocketManager SocketManager { get;set; }
/// <summary> /// <summary>
/// Indicates whether admin operations should be allowed /// Indicates whether admin operations should be allowed
/// </summary> /// </summary>
...@@ -100,7 +106,7 @@ public ConfigurationOptions() ...@@ -100,7 +106,7 @@ public ConfigurationOptions()
public EndPointCollection EndPoints { get { return endpoints; } } public EndPointCollection EndPoints { get { return endpoints; } }
/// <summary> /// <summary>
/// Specifies the time in milliseconds at which connections should be pinged to ensure validity /// Specifies the time in seconds at which connections should be pinged to ensure validity
/// </summary> /// </summary>
public int KeepAlive { get { return keepAlive.GetValueOrDefault(-1); } set { keepAlive = value; } } public int KeepAlive { get { return keepAlive.GetValueOrDefault(-1); } set { keepAlive = value; } }
...@@ -178,7 +184,8 @@ public ConfigurationOptions Clone() ...@@ -178,7 +184,8 @@ public ConfigurationOptions Clone()
CommandMap = CommandMap, CommandMap = CommandMap,
CertificateValidation = CertificateValidation, CertificateValidation = CertificateValidation,
CertificateSelection = CertificateSelection, CertificateSelection = CertificateSelection,
ChannelPrefix = ChannelPrefix.Clone() ChannelPrefix = ChannelPrefix.Clone(),
SocketManager = SocketManager,
}; };
foreach (var item in endpoints) foreach (var item in endpoints)
options.endpoints.Add(item); options.endpoints.Add(item);
...@@ -298,6 +305,7 @@ void Clear() ...@@ -298,6 +305,7 @@ void Clear()
CertificateValidation = null; CertificateValidation = null;
CommandMap = CommandMap.Default; CommandMap = CommandMap.Default;
ChannelPrefix = default(RedisChannel); ChannelPrefix = default(RedisChannel);
SocketManager = null;
} }
object ICloneable.Clone() { return Clone(); } object ICloneable.Clone() { return Clone(); }
......
...@@ -6,12 +6,19 @@ namespace StackExchange.Redis ...@@ -6,12 +6,19 @@ namespace StackExchange.Redis
{ {
partial class ConnectionMultiplexer partial class ConnectionMultiplexer
{ {
partial void OnCreateReaderWriter() internal SocketManager SocketManager { get { return socketManager; } }
private SocketManager socketManager;
private bool ownsSocketManager;
partial void OnCreateReaderWriter(ConfigurationOptions configuration)
{ {
this.ownsSocketManager = configuration.SocketManager == null;
this.socketManager = configuration.SocketManager ?? new SocketManager(configuration.ClientName);
// we need a dedicated writer, because when under heavy ambient load // we need a dedicated writer, because when under heavy ambient load
// (a busy asp.net site, for example), workers are not reliable enough // (a busy asp.net site, for example), workers are not reliable enough
Thread dedicatedWriter = new Thread(writeAllQueues); Thread dedicatedWriter = new Thread(writeAllQueues);
dedicatedWriter.Name = "SE.Redis.Writer"; dedicatedWriter.Name = socketManager.Name + ":Write";
dedicatedWriter.IsBackground = true; // should not keep process alive dedicatedWriter.IsBackground = true; // should not keep process alive
dedicatedWriter.Start(this); // will self-exit when disposed dedicatedWriter.Start(this); // will self-exit when disposed
} }
...@@ -22,6 +29,8 @@ partial class ConnectionMultiplexer ...@@ -22,6 +29,8 @@ partial class ConnectionMultiplexer
{ // make sure writer threads know to exit { // make sure writer threads know to exit
Monitor.PulseAll(writeQueue); Monitor.PulseAll(writeQueue);
} }
if (ownsSocketManager) socketManager.Dispose();
socketManager = null;
} }
private readonly Queue<PhysicalBridge> writeQueue = new Queue<PhysicalBridge>(); private readonly Queue<PhysicalBridge> writeQueue = new Queue<PhysicalBridge>();
......
...@@ -87,7 +87,7 @@ internal void OnConnectionFailed(EndPoint endpoint, ConnectionFailureType failur ...@@ -87,7 +87,7 @@ internal void OnConnectionFailed(EndPoint endpoint, ConnectionFailureType failur
} }
if (reconfigure) if (reconfigure)
{ {
ReconfigureIfNeeded(endpoint, false); ReconfigureIfNeeded(endpoint, false, "connection failed");
} }
} }
internal void OnConnectionRestored(EndPoint endpoint) internal void OnConnectionRestored(EndPoint endpoint)
...@@ -100,7 +100,7 @@ internal void OnConnectionRestored(EndPoint endpoint) ...@@ -100,7 +100,7 @@ internal void OnConnectionRestored(EndPoint endpoint)
new EndPointEventArgs(handler, this, endpoint) new EndPointEventArgs(handler, this, endpoint)
); );
} }
ReconfigureIfNeeded(endpoint, false); ReconfigureIfNeeded(endpoint, false, "connection restored");
} }
...@@ -118,9 +118,9 @@ internal void OnConfigurationChanged(EndPoint endpoint) ...@@ -118,9 +118,9 @@ internal void OnConfigurationChanged(EndPoint endpoint)
{ {
OnEndpointChanged(endpoint, ConfigurationChanged); OnEndpointChanged(endpoint, ConfigurationChanged);
} }
internal void OnMasterChanged(EndPoint endpoint) internal void OnConfigurationChangedBroadcast(EndPoint endpoint)
{ {
OnEndpointChanged(endpoint, MasterChanged); OnEndpointChanged(endpoint, ConfigurationChangedBroadcast);
} }
/// <summary> /// <summary>
...@@ -335,7 +335,7 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options ...@@ -335,7 +335,7 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
// and reconfigure the muxer // and reconfigure the muxer
LogLocked(log, "Reconfiguring all endpoints..."); LogLocked(log, "Reconfiguring all endpoints...");
if (!ReconfigureAsync(false, true, log, srv.EndPoint).ObserveErrors().Wait(5000)) if (!ReconfigureAsync(false, true, log, srv.EndPoint, "make master").ObserveErrors().Wait(5000))
{ {
LogLocked(log, "Verifying the configuration was incomplete; please verify"); LogLocked(log, "Verifying the configuration was incomplete; please verify");
} }
...@@ -400,9 +400,10 @@ static void WriteNormalizingLineEndings(string source, StreamWriter writer) ...@@ -400,9 +400,10 @@ static void WriteNormalizingLineEndings(string source, StreamWriter writer)
public event EventHandler<EndPointEventArgs> ConfigurationChanged; public event EventHandler<EndPointEventArgs> ConfigurationChanged;
/// <summary> /// <summary>
/// Raised when configuration changes involving a new master are announces /// Raised when nodes are explicitly requested to reconfigure via broadcast;
/// this usually means master/slave changes
/// </summary> /// </summary>
public event EventHandler<EndPointEventArgs> MasterChanged; public event EventHandler<EndPointEventArgs> ConfigurationChangedBroadcast;
/// <summary> /// <summary>
/// Gets the timeout associated with the connections /// Gets the timeout associated with the connections
...@@ -612,7 +613,7 @@ public static async Task<ConnectionMultiplexer> ConnectAsync(string configuratio ...@@ -612,7 +613,7 @@ public static async Task<ConnectionMultiplexer> ConnectAsync(string configuratio
{ {
var muxer = CreateMultiplexer(configuration); var muxer = CreateMultiplexer(configuration);
killMe = muxer; killMe = muxer;
bool configured = await muxer.ReconfigureAsync(true, false, log, null).ObserveErrors().ForAwait(); bool configured = await muxer.ReconfigureAsync(true, false, log, null, "connect").ObserveErrors().ForAwait();
if (!configured) if (!configured)
{ {
throw new InvalidOperationException("Unable to configure servers"); throw new InvalidOperationException("Unable to configure servers");
...@@ -635,7 +636,7 @@ public static async Task<ConnectionMultiplexer> ConnectAsync(ConfigurationOption ...@@ -635,7 +636,7 @@ public static async Task<ConnectionMultiplexer> ConnectAsync(ConfigurationOption
{ {
var muxer = CreateMultiplexer(configuration); var muxer = CreateMultiplexer(configuration);
killMe = muxer; killMe = muxer;
bool configured = await muxer.ReconfigureAsync(true, false, log, null).ObserveErrors().ForAwait(); bool configured = await muxer.ReconfigureAsync(true, false, log, null, "connect").ObserveErrors().ForAwait();
if (!configured) if (!configured)
{ {
throw new InvalidOperationException("Unable to configure servers"); throw new InvalidOperationException("Unable to configure servers");
...@@ -676,7 +677,7 @@ public static ConnectionMultiplexer Connect(string configuration, TextWriter log ...@@ -676,7 +677,7 @@ public static ConnectionMultiplexer Connect(string configuration, TextWriter log
var muxer = CreateMultiplexer(configuration); var muxer = CreateMultiplexer(configuration);
killMe = muxer; killMe = muxer;
// note that task has timeouts internally, so it might take *just over* the reegular timeout // note that task has timeouts internally, so it might take *just over* the reegular timeout
var task = muxer.ReconfigureAsync(true, false, log, null); var task = muxer.ReconfigureAsync(true, false, log, null, "connect");
if (!task.Wait(muxer.SyncConnectTimeout)) if (!task.Wait(muxer.SyncConnectTimeout))
{ {
task.ObserveErrors(); task.ObserveErrors();
...@@ -704,7 +705,7 @@ public static ConnectionMultiplexer Connect(ConfigurationOptions configuration, ...@@ -704,7 +705,7 @@ public static ConnectionMultiplexer Connect(ConfigurationOptions configuration,
var muxer = CreateMultiplexer(configuration); var muxer = CreateMultiplexer(configuration);
killMe = muxer; killMe = muxer;
// note that task has timeouts internally, so it might take *just over* the reegular timeout // note that task has timeouts internally, so it might take *just over* the reegular timeout
var task = muxer.ReconfigureAsync(true, false, log, null); var task = muxer.ReconfigureAsync(true, false, log, null, "connect");
if (!task.Wait(muxer.SyncConnectTimeout)) if (!task.Wait(muxer.SyncConnectTimeout))
{ {
task.ObserveErrors(); task.ObserveErrors();
...@@ -759,6 +760,7 @@ private ConnectionMultiplexer(ConfigurationOptions configuration) ...@@ -759,6 +760,7 @@ private ConnectionMultiplexer(ConfigurationOptions configuration)
{ {
if (configuration == null) throw new ArgumentNullException("configuration"); if (configuration == null) throw new ArgumentNullException("configuration");
ShowKeysInTimeout = true; ShowKeysInTimeout = true;
this.configuration = configuration; this.configuration = configuration;
this.CommandMap = configuration.CommandMap; this.CommandMap = configuration.CommandMap;
this.CommandMap.AssertAvailable(RedisCommand.PING); this.CommandMap.AssertAvailable(RedisCommand.PING);
...@@ -768,6 +770,7 @@ private ConnectionMultiplexer(ConfigurationOptions configuration) ...@@ -768,6 +770,7 @@ private ConnectionMultiplexer(ConfigurationOptions configuration)
PreserveAsyncOrder = true; // safest default PreserveAsyncOrder = true; // safest default
this.timeoutMilliseconds = configuration.SyncTimeout; this.timeoutMilliseconds = configuration.SyncTimeout;
OnCreateReaderWriter(configuration);
unprocessableCompletionManager = new CompletionManager(this, "multiplexer"); unprocessableCompletionManager = new CompletionManager(this, "multiplexer");
serverSelectionStrategy = new ServerSelectionStrategy(this); serverSelectionStrategy = new ServerSelectionStrategy(this);
...@@ -776,11 +779,9 @@ private ConnectionMultiplexer(ConfigurationOptions configuration) ...@@ -776,11 +779,9 @@ private ConnectionMultiplexer(ConfigurationOptions configuration)
{ {
ConfigurationChangedChannel = Encoding.UTF8.GetBytes(configChannel); ConfigurationChangedChannel = Encoding.UTF8.GetBytes(configChannel);
} }
OnCreateReaderWriter();
} }
partial void OnCreateReaderWriter(); partial void OnCreateReaderWriter(ConfigurationOptions configuration);
internal const int MillisecondsPerHeartbeat = 1000; internal const int MillisecondsPerHeartbeat = 1000;
...@@ -871,6 +872,11 @@ internal static void TraceWithoutContext(string message, [CallerMemberName] stri ...@@ -871,6 +872,11 @@ internal static void TraceWithoutContext(string message, [CallerMemberName] stri
{ {
OnTraceWithoutContext(message, category); OnTraceWithoutContext(message, category);
} }
[Conditional("VERBOSE")]
internal static void TraceWithoutContext(bool condition, string message, [CallerMemberName] string category = null)
{
if(condition) OnTraceWithoutContext(message, category);
}
private readonly CompletionManager unprocessableCompletionManager; private readonly CompletionManager unprocessableCompletionManager;
...@@ -887,15 +893,23 @@ internal static void TraceWithoutContext(string message, [CallerMemberName] stri ...@@ -887,15 +893,23 @@ internal static void TraceWithoutContext(string message, [CallerMemberName] stri
} }
} }
int activeReconfigs = 1; string activeConfigCause;
internal void ReconfigureIfNeeded(EndPoint blame, bool masterChanged) internal void ReconfigureIfNeeded(EndPoint blame, bool fromBroadcast, string cause)
{ {
if (Interlocked.CompareExchange(ref activeReconfigs, 0, 0) == 0) if (fromBroadcast)
{ {
OnConfigurationChangedBroadcast(blame);
}
string activeCause = Interlocked.CompareExchange(ref activeConfigCause, null, null);
if (activeCause == null)
{
bool reconfigureAll = fromBroadcast;
Trace("Configuration change detected; checking nodes", "Configuration"); Trace("Configuration change detected; checking nodes", "Configuration");
if (masterChanged) OnMasterChanged(blame); ReconfigureAsync(false, reconfigureAll, null, blame, cause).ObserveErrors();
ReconfigureAsync(false, false, null, blame).ObserveErrors(); } else
{
Trace("Configuration change skipped; already in progress via " + activeCause, "Configuration");
} }
} }
...@@ -904,7 +918,7 @@ internal void ReconfigureIfNeeded(EndPoint blame, bool masterChanged) ...@@ -904,7 +918,7 @@ internal void ReconfigureIfNeeded(EndPoint blame, bool masterChanged)
/// </summary> /// </summary>
public Task<bool> ConfigureAsync(TextWriter log = null) public Task<bool> ConfigureAsync(TextWriter log = null)
{ {
return ReconfigureAsync(false, true, log, null).ObserveErrors(); return ReconfigureAsync(false, true, log, null, "configure").ObserveErrors();
} }
/// <summary> /// <summary>
/// Reconfigure the current connections based on the existing configuration /// Reconfigure the current connections based on the existing configuration
...@@ -913,7 +927,7 @@ public bool Configure(TextWriter log = null) ...@@ -913,7 +927,7 @@ public bool Configure(TextWriter log = null)
{ {
// note we expect ReconfigureAsync to internally allow [n] duration, // note we expect ReconfigureAsync to internally allow [n] duration,
// so to avoid near misses, here we wait 2*[n] // so to avoid near misses, here we wait 2*[n]
var task = ReconfigureAsync(false, true, log, null); var task = ReconfigureAsync(false, true, log, null, "configure");
if (!task.Wait(SyncConnectTimeout)) if (!task.Wait(SyncConnectTimeout))
{ {
task.ObserveErrors(); task.ObserveErrors();
...@@ -935,7 +949,7 @@ internal int SyncConnectTimeout ...@@ -935,7 +949,7 @@ internal int SyncConnectTimeout
} }
} }
internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, TextWriter log, EndPoint blame) internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, TextWriter log, EndPoint blame, string cause)
{ {
if (isDisposed) throw new ObjectDisposedException(ToString()); if (isDisposed) throw new ObjectDisposedException(ToString());
...@@ -948,7 +962,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -948,7 +962,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
bool ranThisCall = false; bool ranThisCall = false;
try try
{ // note that "activeReconfigs" starts at one; we don't need to set it the first time { // note that "activeReconfigs" starts at one; we don't need to set it the first time
ranThisCall = first || Interlocked.CompareExchange(ref activeReconfigs, 1, 0) == 0; ranThisCall = first || Interlocked.CompareExchange(ref activeConfigCause, cause, null) == null;
if (!ranThisCall) if (!ranThisCall)
{ {
...@@ -1179,7 +1193,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1179,7 +1193,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
{ {
Trace("Exiting reconfiguration..."); Trace("Exiting reconfiguration...");
OnTraceLog(log); OnTraceLog(log);
if (ranThisCall) Interlocked.Exchange(ref activeReconfigs, 0); if (ranThisCall) Interlocked.Exchange(ref activeConfigCause, null);
if (!first) OnConfigurationChanged(blame); if (!first) OnConfigurationChanged(blame);
Trace("Reconfiguration exited"); Trace("Reconfiguration exited");
} }
...@@ -1437,8 +1451,8 @@ public void Close(bool allowCommandsToComplete = true) ...@@ -1437,8 +1451,8 @@ public void Close(bool allowCommandsToComplete = true)
var quits = QuitAllServers(); var quits = QuitAllServers();
WaitAllIgnoreErrors(quits); WaitAllIgnoreErrors(quits);
} }
OnCloseReaderWriter();
DisposeAndClearServers(); DisposeAndClearServers();
OnCloseReaderWriter();
} }
partial void OnCloseReaderWriter(); partial void OnCloseReaderWriter();
......
...@@ -86,5 +86,13 @@ public interface ISubscriber : IRedis ...@@ -86,5 +86,13 @@ public interface ISubscriber : IRedis
/// </summary> /// </summary>
[IgnoreNamePrefix] [IgnoreNamePrefix]
Task<EndPoint> IdentifyEndpointAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None); Task<EndPoint> IdentifyEndpointAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Inidicate to which redis server we are actively subscribed for a given channel; returns null if
/// the channel is not actively subscribed
/// </summary>
[IgnoreNamePrefix]
EndPoint SubscribedEndpoint(RedisChannel channel);
} }
} }
\ No newline at end of file
...@@ -63,8 +63,8 @@ abstract class Message : ICompletable ...@@ -63,8 +63,8 @@ abstract class Message : ICompletable
protected RedisCommand command; protected RedisCommand command;
private const CommandFlags AskingFlag = (CommandFlags)32, private const CommandFlags AskingFlag = (CommandFlags)32;
InternalCallFlag = (CommandFlags)128; internal const CommandFlags InternalCallFlag = (CommandFlags)128;
public virtual void AppendStormLog(StringBuilder sb) public virtual void AppendStormLog(StringBuilder sb)
...@@ -349,12 +349,13 @@ public bool IsMasterOnly() ...@@ -349,12 +349,13 @@ public bool IsMasterOnly()
} }
/// <summary> /// <summary>
/// This does two important things: /// This does a few important things:
/// 1: it suppresses error events for commands that the user isn't interested in /// 1: it suppresses error events for commands that the user isn't interested in
/// (i.e. "why does my standalone server keep saying ERR unknown command 'cluster' ?") /// (i.e. "why does my standalone server keep saying ERR unknown command 'cluster' ?")
/// 2: it allows the initial PING and GET (during connect) to get queued rather /// 2: it allows the initial PING and GET (during connect) to get queued rather
/// than be rejected as no-server-available (note that this doesn't apply to /// than be rejected as no-server-available (note that this doesn't apply to
/// handshake messages, as they bypass the queue completely) /// handshake messages, as they bypass the queue completely)
/// 3: it disables non-pref logging, as it is usually server-targeted
/// </summary> /// </summary>
public void SetInternalCall() public void SetInternalCall()
{ {
...@@ -363,7 +364,7 @@ public void SetInternalCall() ...@@ -363,7 +364,7 @@ public void SetInternalCall()
public override string ToString() public override string ToString()
{ {
return string.Format("[{0}]:{1} ({2})", Db, Command, return string.Format("[{0}]:{1} ({2})", Db, CommandAndKey,
resultProcessor == null ? "(n/a)" : resultProcessor.GetType().Name); resultProcessor == null ? "(n/a)" : resultProcessor.GetType().Name);
} }
......
...@@ -23,7 +23,7 @@ private static readonly Message ...@@ -23,7 +23,7 @@ private static readonly Message
private int beating; private int beating;
int failConnectCount = 0; int failConnectCount = 0;
volatile bool isDisposed; volatile bool isDisposed;
private volatile int missedHeartbeats; //private volatile int missedHeartbeats;
private long operationCount, socketCount; private long operationCount, socketCount;
private int pendingCount; private int pendingCount;
private volatile PhysicalConnection physical; private volatile PhysicalConnection physical;
...@@ -116,7 +116,6 @@ public bool TryEnqueue(Message message, bool isSlave) ...@@ -116,7 +116,6 @@ public bool TryEnqueue(Message message, bool isSlave)
// you can go in the queue, but we won't be starting // you can go in the queue, but we won't be starting
// a worker, because the handshake has not completed // a worker, because the handshake has not completed
queue.Push(message); queue.Push(message);
LogNonPreferred(message.Flags, isSlave);
Interlocked.Increment(ref pendingCount); Interlocked.Increment(ref pendingCount);
return true; return true;
} }
...@@ -139,6 +138,8 @@ public bool TryEnqueue(Message message, bool isSlave) ...@@ -139,6 +138,8 @@ public bool TryEnqueue(Message message, bool isSlave)
return true; return true;
} }
private void LogNonPreferred(CommandFlags flags, bool isSlave) private void LogNonPreferred(CommandFlags flags, bool isSlave)
{
if ((flags & Message.InternalCallFlag) == 0) // don't log internal-call
{ {
if (isSlave) if (isSlave)
{ {
...@@ -151,6 +152,7 @@ private void LogNonPreferred(CommandFlags flags, bool isSlave) ...@@ -151,6 +152,7 @@ private void LogNonPreferred(CommandFlags flags, bool isSlave)
Interlocked.Increment(ref nonPreferredEndpointCount); Interlocked.Increment(ref nonPreferredEndpointCount);
} }
} }
}
long nonPreferredEndpointCount; long nonPreferredEndpointCount;
internal void GetCounters(ConnectionCounters counters) internal void GetCounters(ConnectionCounters counters)
...@@ -231,27 +233,31 @@ internal void IncrementOpCount() ...@@ -231,27 +233,31 @@ internal void IncrementOpCount()
internal void KeepAlive() internal void KeepAlive()
{ {
var commandMap = multiplexer.CommandMap; var commandMap = multiplexer.CommandMap;
Message msg; Message msg = null;
switch (connectionType) switch (connectionType)
{ {
case ConnectionType.Interactive: case ConnectionType.Interactive:
if (commandMap.IsAvailable(RedisCommand.PING)) if (commandMap.IsAvailable(RedisCommand.PING))
{ {
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.PING); msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.PING);
msg.SetInternalCall(); msg.SetSource(ResultProcessor.DemandPONG, null);
serverEndPoint.QueueDirectFireAndForget(msg, ResultProcessor.DemandPONG);
} }
break; break;
case ConnectionType.Subscription: case ConnectionType.Subscription:
if (commandMap.IsAvailable(RedisCommand.UNSUBSCRIBE)) if (commandMap.IsAvailable(RedisCommand.UNSUBSCRIBE))
{ {
RedisKey channel = Guid.NewGuid().ToByteArray(); msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.UNSUBSCRIBE,
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.UNSUBSCRIBE, channel); (RedisChannel)Guid.NewGuid().ToByteArray());
msg.SetInternalCall(); msg.SetSource(ResultProcessor.TrackSubscriptions, null);
serverEndPoint.QueueDirectFireAndForget(msg, ResultProcessor.TrackSubscriptions);
} }
break; break;
} }
if(msg != null)
{
msg.SetInternalCall();
multiplexer.Trace("Enqueue: " + msg);
TryEnqueue(msg, serverEndPoint.IsSlave);
}
} }
internal void OnConnected(PhysicalConnection connection) internal void OnConnected(PhysicalConnection connection)
...@@ -267,12 +273,13 @@ internal void OnConnected(PhysicalConnection connection) ...@@ -267,12 +273,13 @@ internal void OnConnected(PhysicalConnection connection)
} }
} }
internal void OnConnectionFailed(EndPoint endPoint, ConnectionFailureType failureType, Exception innerException) internal void OnConnectionFailed(PhysicalConnection connection, ConnectionFailureType failureType, Exception innerException)
{ {
if (reportNextFailure) if (connection == physical && reportNextFailure)
{ {
reportNextFailure = false; // until it is restored reportNextFailure = false; // until it is restored
multiplexer.OnConnectionFailed(endPoint, failureType, innerException, reconfigureNextFailure); var endpoint = serverEndPoint.EndPoint;
multiplexer.OnConnectionFailed(endpoint, failureType, innerException, reconfigureNextFailure);
} }
} }
...@@ -327,7 +334,10 @@ internal void OnFullyEstablished(PhysicalConnection connection) ...@@ -327,7 +334,10 @@ internal void OnFullyEstablished(PhysicalConnection connection)
try { connection.Dispose(); } catch { } try { connection.Dispose(); } catch { }
} }
} }
internal int GetPendingCount()
{
return Thread.VolatileRead(ref pendingCount);
}
internal void OnHeartbeat() internal void OnHeartbeat()
{ {
bool runThisTime = false; bool runThisTime = false;
...@@ -348,11 +358,10 @@ internal void OnHeartbeat() ...@@ -348,11 +358,10 @@ internal void OnHeartbeat()
var tmp = physical; var tmp = physical;
if (tmp != null) if (tmp != null)
{ {
int maxMissed = serverEndPoint.MaxMissedHeartbeats; int writeEvery = serverEndPoint.WriteEverySeconds;
if (maxMissed > 0 && ++missedHeartbeats >= maxMissed && Thread.VolatileRead(ref pendingCount) == 0) if (writeEvery > 0 && tmp.LastWriteSecondsAgo >= writeEvery && Thread.VolatileRead(ref pendingCount) == 0)
{ {
Trace("OnHeartbeat - overdue"); Trace("OnHeartbeat - overdue");
missedHeartbeats = 0;
if (state == (int)State.ConnectedEstablished) if (state == (int)State.ConnectedEstablished)
{ {
KeepAlive(); KeepAlive();
...@@ -385,11 +394,6 @@ internal void RemovePhysical(PhysicalConnection connection) ...@@ -385,11 +394,6 @@ internal void RemovePhysical(PhysicalConnection connection)
Interlocked.CompareExchange(ref physical, null, connection); Interlocked.CompareExchange(ref physical, null, connection);
} }
internal void Seen()
{
missedHeartbeats = 0;
}
[Conditional("VERBOSE")] [Conditional("VERBOSE")]
internal void Trace(string message) internal void Trace(string message)
{ {
...@@ -662,23 +666,26 @@ internal WriteResult WriteQueue(int maxWork) ...@@ -662,23 +666,26 @@ internal WriteResult WriteQueue(int maxWork)
} }
int count = 0; int count = 0;
Message last = null;
while (true) while (true)
{ {
var next = queue.Dequeue(); var next = queue.Dequeue();
if (next == null) if (next == null)
{ {
Trace("Nothing to write; exiting"); Trace("Nothing to write; exiting");
{ Trace(last != null, "Flushed up to: " + last);
conn.Flush(); conn.Flush();
return WriteResult.QueueEmpty; return WriteResult.QueueEmpty;
} }
} last = next;
var newPendingCount = Interlocked.Decrement(ref pendingCount); var newPendingCount = Interlocked.Decrement(ref pendingCount);
Trace("Now pending: " + newPendingCount); Trace("Now pending: " + newPendingCount);
WriteMessageDirect(conn, next); WriteMessageDirect(conn, next);
count++; count++;
if (maxWork > 0 && count >= maxWork) if (maxWork > 0 && count >= maxWork)
{ {
Trace("Work limit; exiting");
Trace(last != null, "Flushed up to: " + last);
conn.Flush(); conn.Flush();
break; break;
} }
......
...@@ -20,16 +20,20 @@ internal enum WorkState ...@@ -20,16 +20,20 @@ internal enum WorkState
HasWork HasWork
} }
internal sealed partial class PhysicalConnection : IDisposable internal sealed partial class PhysicalConnection : IDisposable, ISocketCallback
{ {
void ISocketCallback.Error()
{
RecordConnectionFailed(ConnectionFailureType.SocketFailure);
}
private const int DefaultRedisDatabaseCount = 16; private const int DefaultRedisDatabaseCount = 16;
public long SubscriptionCount { get;set; } public long SubscriptionCount { get;set; }
private static readonly byte[] Crlf = Encoding.ASCII.GetBytes("\r\n"); private static readonly byte[] Crlf = Encoding.ASCII.GetBytes("\r\n");
private static readonly AsyncCallback endReadShared = EndReadShared;
private static readonly byte[] message = Encoding.UTF8.GetBytes("message"), pmessage = Encoding.UTF8.GetBytes("pmessage"); private static readonly byte[] message = Encoding.UTF8.GetBytes("message"), pmessage = Encoding.UTF8.GetBytes("pmessage");
static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select( static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select(
...@@ -62,10 +66,11 @@ private static readonly Message ...@@ -62,10 +66,11 @@ private static readonly Message
// things sent to this physical, but not yet received // things sent to this physical, but not yet received
Queue<Message> outstanding = new Queue<Message>(); Queue<Message> outstanding = new Queue<Message>();
private Socket socket; private SocketToken socketToken;
public PhysicalConnection(PhysicalBridge bridge) public PhysicalConnection(PhysicalBridge bridge)
{ {
lastWriteTickCount = lastReadTickCount = Environment.TickCount;
this.connectionType = bridge.ConnectionType; this.connectionType = bridge.ConnectionType;
this.multiplexer = bridge.Multiplexer; this.multiplexer = bridge.Multiplexer;
this.ChannelPrefix = multiplexer.RawConfig.ChannelPrefix; this.ChannelPrefix = multiplexer.RawConfig.ChannelPrefix;
...@@ -74,11 +79,17 @@ public PhysicalConnection(PhysicalBridge bridge) ...@@ -74,11 +79,17 @@ public PhysicalConnection(PhysicalBridge bridge)
physicalName = connectionType + "#" + Interlocked.Increment(ref totalCount) + "@" + endpoint.ToString(); physicalName = connectionType + "#" + Interlocked.Increment(ref totalCount) + "@" + endpoint.ToString();
this.bridge = bridge; this.bridge = bridge;
multiplexer.Trace("Connecting...", physicalName); multiplexer.Trace("Connecting...", physicalName);
socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.NoDelay = true; this.socketToken = multiplexer.SocketManager.BeginConnect(endpoint, this);
//socket.SendTimeout = socket.ReceiveTimeout = multiplexer.TimeoutMilliseconds; //socket.SendTimeout = socket.ReceiveTimeout = multiplexer.TimeoutMilliseconds;
OnCreateEcho(); OnCreateEcho();
socket.BeginConnect(endpoint, ConnectCallback, socket); }
public long LastWriteSecondsAgo
{
get
{
return unchecked(Environment.TickCount - Interlocked.Read(ref lastWriteTickCount)) / 1000;
}
} }
private enum ReadMode : byte private enum ReadMode : byte
...@@ -94,36 +105,27 @@ private enum ReadMode : byte ...@@ -94,36 +105,27 @@ private enum ReadMode : byte
public bool TransactionActive { get; internal set; } public bool TransactionActive { get; internal set; }
public void BeginRead()
{
multiplexer.Trace("BeginRead", physicalName);
try
{
int space = EnsureSpaceAndComputeBytesToRead();
netStream.BeginRead(ioBuffer, ioBufferBytes, space, endReadShared, this);
}
catch (Exception ex)
{
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
}
}
public void Dispose() public void Dispose()
{ {
if (socket != null) if (outStream != null)
{ {
multiplexer.Trace("Disconnecting...", physicalName); multiplexer.Trace("Disconnecting...", physicalName);
try { outStream.Close(); } catch { } try { outStream.Close(); } catch { }
try { outStream.Dispose(); } catch { } try { outStream.Dispose(); } catch { }
outStream = null; outStream = null;
}
if (netStream != null)
{
try { netStream.Close(); } catch { } try { netStream.Close(); } catch { }
try { netStream.Dispose(); } catch { } try { netStream.Dispose(); } catch { }
netStream = null; netStream = null;
try { socket.Shutdown(SocketShutdown.Both); } catch { } }
try { socket.Close(); } catch { } if (socketToken.HasValue)
try { socket.Dispose(); } catch { } {
socket = null; var socketManager = multiplexer.SocketManager;
if(socketManager !=null) socketManager.Shutdown(socketToken);
socketToken = default(SocketToken);
multiplexer.Trace("Disconnected", physicalName); multiplexer.Trace("Disconnected", physicalName);
RecordConnectionFailed(ConnectionFailureType.ConnectionDisposed); RecordConnectionFailed(ConnectionFailureType.ConnectionDisposed);
} }
...@@ -148,9 +150,11 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception ...@@ -148,9 +150,11 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception
try try
{ {
long now = Environment.TickCount, lastRead = Interlocked.Read(ref lastReadTickCount), lastWrite = Interlocked.Read(ref lastWriteTickCount); long now = Environment.TickCount, lastRead = Interlocked.Read(ref lastReadTickCount), lastWrite = Interlocked.Read(ref lastWriteTickCount);
string message = failureType + " on " + Format.ToString(bridge.ServerEndPoint.EndPoint) + "/" + connectionType string message = failureType + " on " + Format.ToString(bridge.ServerEndPoint.EndPoint) + "/" + connectionType
+ ", input-buffer: " + ioBufferBytes + ", outstanding: " + GetOutstandingCount() + ", input-buffer: " + ioBufferBytes + ", outstanding: " + GetOutstandingCount()
+ ", last-read: " + (now - lastRead) / 1000 + "s ago, last-write: " + (now - lastWrite) / 1000 + "s ago"; + ", last-read: " + unchecked(now - lastRead) / 1000 + "s ago, last-write: " + unchecked(now - lastWrite) / 1000 + "s ago, keep-alive: " + bridge.ServerEndPoint.WriteEverySeconds + "s, pending: "
+ bridge.GetPendingCount();
var ex = innerException == null var ex = innerException == null
? new RedisConnectionException(failureType, message) ? new RedisConnectionException(failureType, message)
...@@ -159,7 +163,7 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception ...@@ -159,7 +163,7 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception
} }
catch(Exception caught) catch(Exception caught)
{ {
bridge.OnConnectionFailed(bridge.ServerEndPoint.EndPoint, failureType, caught); bridge.OnConnectionFailed(this, failureType, caught);
} }
} }
...@@ -178,12 +182,7 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception ...@@ -178,12 +182,7 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception
} }
// burn the socket // burn the socket
var tmp = socket; multiplexer.SocketManager.Shutdown(socketToken);
if (tmp != null)
{
try { tmp.Shutdown(SocketShutdown.Both); } catch { }
try { tmp.Close(); } catch { }
}
} }
public override string ToString() public override string ToString()
...@@ -339,10 +338,6 @@ internal void WriteHeader(RedisCommand command, int arguments) ...@@ -339,10 +338,6 @@ internal void WriteHeader(RedisCommand command, int arguments)
WriteUnified(outStream, commandBytes); WriteUnified(outStream, commandBytes);
} }
private static void EndReadShared(IAsyncResult ar)
{
((PhysicalConnection)ar.AsyncState).EndRead(ar);
}
static void WriteRaw(Stream stream, long value, bool withLengthPrefix = false) static void WriteRaw(Stream stream, long value, bool withLengthPrefix = false)
{ {
...@@ -458,35 +453,30 @@ static void WriteUnified(Stream stream, long value) ...@@ -458,35 +453,30 @@ static void WriteUnified(Stream stream, long value)
WriteRaw(stream, value, withLengthPrefix: true); WriteRaw(stream, value, withLengthPrefix: true);
} }
private void ConnectCallback(IAsyncResult ar) void ISocketCallback.Connected(Stream stream)
{ {
try try
{ {
var sock = (Socket)ar.AsyncState;
(sock).EndConnect(ar);
// disallow connection in some cases // disallow connection in some cases
OnDebugAbort(); OnDebugAbort();
// the order is important here: // the order is important here:
// [network]<==[ssl]<==[logging]<==[buffered] // [network]<==[ssl]<==[logging]<==[buffered]
netStream = new NetworkStream(sock);
var config = multiplexer.RawConfig; var config = multiplexer.RawConfig;
if (!string.IsNullOrWhiteSpace(config.SslHost)) if (!string.IsNullOrWhiteSpace(config.SslHost))
{ {
var ssl = new SslStream(netStream, false, config.CertificateValidationCallback, config.CertificateSelectionCallback, EncryptionPolicy.RequireEncryption); var ssl = new SslStream(netStream, false, config.CertificateValidationCallback, config.CertificateSelectionCallback, EncryptionPolicy.RequireEncryption);
ssl.AuthenticateAsClient(config.SslHost); ssl.AuthenticateAsClient(config.SslHost);
netStream = ssl; stream = ssl;
} }
OnWrapForLogging(ref netStream, physicalName); OnWrapForLogging(ref stream, physicalName);
this.netStream = stream;
int bufferSize = config.WriteBuffer; int bufferSize = config.WriteBuffer;
outStream = bufferSize <= 0 ? netStream : new BufferedStream(netStream, bufferSize); outStream = bufferSize <= 0 ? netStream : new BufferedStream(stream, bufferSize);
multiplexer.Trace("Connected", physicalName); multiplexer.Trace("Connected", physicalName);
BeginRead();
bridge.OnConnected(this); bridge.OnConnected(this);
} }
catch (Exception ex) catch (Exception ex)
...@@ -496,23 +486,6 @@ private void ConnectCallback(IAsyncResult ar) ...@@ -496,23 +486,6 @@ private void ConnectCallback(IAsyncResult ar)
} }
} }
private void EndRead(IAsyncResult ar)
{
bridge.Seen();
try
{
var tmp = netStream;
int read = tmp == null ? 0 : tmp.EndRead(ar);
multiplexer.Trace("EndRead:" + read, physicalName);
ProcessBytes(read);
}
catch (Exception ex)
{
multiplexer.Trace("EndRead:" + ex.Message, physicalName);
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
}
}
int EnsureSpaceAndComputeBytesToRead() int EnsureSpaceAndComputeBytesToRead()
{ {
int space = ioBuffer.Length - ioBufferBytes; int space = ioBuffer.Length - ioBufferBytes;
...@@ -526,7 +499,6 @@ int EnsureSpaceAndComputeBytesToRead() ...@@ -526,7 +499,6 @@ int EnsureSpaceAndComputeBytesToRead()
internal readonly byte[] ChannelPrefix; internal readonly byte[] ChannelPrefix;
void MatchResult(RawResult result) void MatchResult(RawResult result)
{ {
multiplexer.Trace("Matching result...", physicalName);
// check to see if it could be an out-of-band pubsub message // check to see if it could be an out-of-band pubsub message
if (connectionType == ConnectionType.Subscription && result.Type == ResultType.Array) if (connectionType == ConnectionType.Subscription && result.Type == ResultType.Array)
{ // out of band message does not match to a queued message { // out of band message does not match to a queued message
...@@ -546,11 +518,13 @@ void MatchResult(RawResult result) ...@@ -546,11 +518,13 @@ void MatchResult(RawResult result)
} }
} }
catch { } catch { }
multiplexer.ReconfigureIfNeeded(blame, true); multiplexer.Trace("Configuration changed: " + Format.ToString(blame), physicalName);
multiplexer.ReconfigureIfNeeded(blame, true, "broadcast");
} }
// invoke the handlers // invoke the handlers
var channel = items[1].AsRedisChannel(ChannelPrefix); var channel = items[1].AsRedisChannel(ChannelPrefix);
multiplexer.Trace("MESSAGE: " + channel, physicalName);
if (!channel.IsNull) if (!channel.IsNull)
{ {
multiplexer.OnMessage(channel, channel, items[2].AsRedisValue()); multiplexer.OnMessage(channel, channel, items[2].AsRedisValue());
...@@ -560,6 +534,7 @@ void MatchResult(RawResult result) ...@@ -560,6 +534,7 @@ void MatchResult(RawResult result)
else if (items.Length >= 4 && items[0].Assert(pmessage)) else if (items.Length >= 4 && items[0].Assert(pmessage))
{ {
var channel = items[2].AsRedisChannel(ChannelPrefix); var channel = items[2].AsRedisChannel(ChannelPrefix);
multiplexer.Trace("PMESSAGE: " + channel, physicalName);
if (!channel.IsNull) if (!channel.IsNull)
{ {
var sub = items[1].AsRedisChannel(ChannelPrefix); var sub = items[1].AsRedisChannel(ChannelPrefix);
...@@ -570,7 +545,7 @@ void MatchResult(RawResult result) ...@@ -570,7 +545,7 @@ void MatchResult(RawResult result)
// if it didn't look like "[p]message", then we still need to process the pending queue // if it didn't look like "[p]message", then we still need to process the pending queue
} }
multiplexer.Trace("Matching result...", physicalName);
Message msg; Message msg;
lock (outstanding) lock (outstanding)
{ {
...@@ -613,21 +588,22 @@ private int ProcessBuffer(byte[] underlying, ref int offset, ref int count) ...@@ -613,21 +588,22 @@ private int ProcessBuffer(byte[] underlying, ref int offset, ref int count)
return messageCount; return messageCount;
} }
private void ProcessBytes(int bytesRead) void ISocketCallback.Read()
{ {
try try
{ {
bool keepGoing;
do do
{ {
int space = EnsureSpaceAndComputeBytesToRead();
int bytesRead = netStream.Read(ioBuffer, ioBufferBytes, space);
if (bytesRead <= 0) if (bytesRead <= 0)
{ {
multiplexer.Trace("EOF", physicalName); multiplexer.Trace("EOF", physicalName);
RecordConnectionFailed(ConnectionFailureType.SocketClosed); RecordConnectionFailed(ConnectionFailureType.SocketClosed);
keepGoing = false; return;
} }
else
{
Interlocked.Exchange(ref lastReadTickCount, Environment.TickCount); Interlocked.Exchange(ref lastReadTickCount, Environment.TickCount);
ioBufferBytes += bytesRead; ioBufferBytes += bytesRead;
multiplexer.Trace("More bytes available: " + bytesRead + " (" + ioBufferBytes + ")", physicalName); multiplexer.Trace("More bytes available: " + bytesRead + " (" + ioBufferBytes + ")", physicalName);
...@@ -646,25 +622,8 @@ private void ProcessBytes(int bytesRead) ...@@ -646,25 +622,8 @@ private void ProcessBytes(int bytesRead)
} }
ioBufferBytes = count; ioBufferBytes = count;
} }
} while (socketToken.Available != 0);
if (socket.Available == 0) // ^^^ note that the socket manager will call us again when there is something to do
{
BeginRead();
keepGoing = false;
}
else
{
// there is more available on the socket; read it SYNC
// and keep processing
multiplexer.Trace("Socket has data; reading synchronously", physicalName);
int space = EnsureSpaceAndComputeBytesToRead();
bridge.Seen();
bytesRead = netStream.Read(ioBuffer, ioBufferBytes, space);
bridge.Seen();
keepGoing = true;
}
}
} while (keepGoing);
} }
catch (Exception ex) catch (Exception ex)
{ {
......
...@@ -96,29 +96,43 @@ public Task<EndPoint> IdentifyEndpointAsync(RedisChannel channel, CommandFlags f ...@@ -96,29 +96,43 @@ public Task<EndPoint> IdentifyEndpointAsync(RedisChannel channel, CommandFlags f
msg.SetInternalCall(); msg.SetInternalCall();
return ExecuteAsync(msg, ResultProcessor.ConnectionIdentity); return ExecuteAsync(msg, ResultProcessor.ConnectionIdentity);
} }
public EndPoint SubscribedEndpoint(RedisChannel channel)
{
var server = multiplexer.GetSubscribedServer(channel);
return server == null ? null : server.EndPoint;
}
} }
partial class ConnectionMultiplexer partial class ConnectionMultiplexer
{ {
internal bool SubscriberConnected(RedisChannel channel = default(RedisChannel)) internal ServerEndPoint GetSubscribedServer(RedisChannel channel)
{ {
ServerEndPoint server;
if (!channel.IsNullOrEmpty) if (!channel.IsNullOrEmpty)
{ {
lock(subscriptions) lock (subscriptions)
{ {
Subscription sub; Subscription sub;
if(subscriptions.TryGetValue(channel, out sub)) if (subscriptions.TryGetValue(channel, out sub))
{ {
server = sub.GetOwner(); return sub.GetOwner();
} }
} }
} }
return null;
}
internal bool SubscriberConnected(RedisChannel channel = default(RedisChannel))
{
var server = GetSubscribedServer(channel);
if (server != null) return server.IsConnected;
server = SelectServer(-1, RedisCommand.SUBSCRIBE, CommandFlags.DemandMaster, default(RedisKey)); server = SelectServer(-1, RedisCommand.SUBSCRIBE, CommandFlags.DemandMaster, default(RedisKey));
return server != null && server.IsConnected; return server != null && server.IsConnected;
} }
private sealed class Subscription private sealed class Subscription
{ {
private Action<RedisChannel, RedisValue> handler; private Action<RedisChannel, RedisValue> handler;
...@@ -326,6 +340,5 @@ internal void ResendSubscriptions(ServerEndPoint server) ...@@ -326,6 +340,5 @@ internal void ResendSubscriptions(ServerEndPoint server)
} }
return false; return false;
} }
} }
} }
...@@ -376,20 +376,19 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -376,20 +376,19 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
if (key.Assert(timeout) && arr[(i * 2) + 1].TryGetInt64(out i64)) if (key.Assert(timeout) && arr[(i * 2) + 1].TryGetInt64(out i64))
{ {
// note the configuration is in seconds // note the configuration is in seconds
int timeoutMilliseconds = checked((int)i64) * 1000, int timeoutSeconds = checked((int)i64), targetSeconds;
targetMilliseconds; if (timeoutSeconds > 0)
if (timeoutMilliseconds > 0)
{ {
if (timeoutMilliseconds >= 60000) if (timeoutSeconds >= 60)
{ {
targetMilliseconds = timeoutMilliseconds - 15000; // time to spare... targetSeconds = timeoutSeconds - 20; // time to spare...
} }
else else
{ {
targetMilliseconds = (timeoutMilliseconds * 3) / 4; targetSeconds = (timeoutSeconds * 3) / 4;
} }
server.Multiplexer.Trace("Auto-configured timeout: " + targetMilliseconds + "ms"); server.Multiplexer.Trace("Auto-configured timeout: " + targetSeconds + "s");
server.SetHeartbeatMilliseconds(targetMilliseconds); server.WriteEverySeconds = targetSeconds;
} }
} }
else if (key.Assert(databases) && arr[(i * 2) + 1].TryGetInt64(out i64)) else if (key.Assert(databases) && arr[(i * 2) + 1].TryGetInt64(out i64))
...@@ -800,7 +799,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -800,7 +799,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
case ResultType.BulkString: case ResultType.BulkString:
string s = result.GetString(); string s = result.GetString();
RedisType value; RedisType value;
if (!Enum.TryParse<RedisType>(s, true, out value)) value = StackExchange.Redis.RedisType.Unknown; if (!Enum.TryParse<RedisType>(s, true, out value)) value = global::StackExchange.Redis.RedisType.Unknown;
SetResult(message, value); SetResult(message, value);
return true; return true;
} }
......
...@@ -31,7 +31,7 @@ internal sealed partial class ServerEndPoint : IDisposable ...@@ -31,7 +31,7 @@ internal sealed partial class ServerEndPoint : IDisposable
private readonly ConnectionMultiplexer multiplexer; private readonly ConnectionMultiplexer multiplexer;
private int databases, maxMissedHeartbeats; private int databases, writeEverySeconds;
private PhysicalBridge interactive, subscription; private PhysicalBridge interactive, subscription;
...@@ -54,7 +54,7 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint) ...@@ -54,7 +54,7 @@ public ServerEndPoint(ConnectionMultiplexer multiplexer, EndPoint endpoint)
slaveReadOnly = true; slaveReadOnly = true;
isSlave = false; isSlave = false;
databases = 0; databases = 0;
maxMissedHeartbeats = ComputeBeatsFromMilliseconds(config.KeepAlive); writeEverySeconds = config.KeepAlive;
interactive = CreateBridge(ConnectionType.Interactive); interactive = CreateBridge(ConnectionType.Interactive);
serverType = ServerType.Standalone; serverType = ServerType.Standalone;
} }
...@@ -78,7 +78,7 @@ public bool IsConnected ...@@ -78,7 +78,7 @@ public bool IsConnected
public bool IsSlave { get { return isSlave; } set { SetConfig(ref isSlave, value); } } public bool IsSlave { get { return isSlave; } set { SetConfig(ref isSlave, value); } }
public int MaxMissedHeartbeats { get { return maxMissedHeartbeats; } set { SetConfig(ref maxMissedHeartbeats, value); } } public int WriteEverySeconds { get { return writeEverySeconds; } set { SetConfig(ref writeEverySeconds, value); } }
public long OperationCount public long OperationCount
{ {
...@@ -357,11 +357,6 @@ internal void ReportNextFailure() ...@@ -357,11 +357,6 @@ internal void ReportNextFailure()
if (tmp != null) tmp.ReportNextFailure(); if (tmp != null) tmp.ReportNextFailure();
} }
internal void SetHeartbeatMilliseconds(int value)
{
MaxMissedHeartbeats = ComputeBeatsFromMilliseconds(value);
}
internal string Summary() internal string Summary()
{ {
var sb = new StringBuilder(Format.ToString(endpoint)) var sb = new StringBuilder(Format.ToString(endpoint))
...@@ -369,9 +364,8 @@ internal string Summary() ...@@ -369,9 +364,8 @@ internal string Summary()
if (databases > 0) sb.Append("; ").Append(databases).Append(" databases"); if (databases > 0) sb.Append("; ").Append(databases).Append(" databases");
if (maxMissedHeartbeats > 0) if (writeEverySeconds > 0)
sb.Append("; keep-alive: ").Append( sb.Append("; keep-alive: ").Append(TimeSpan.FromSeconds(writeEverySeconds));
TimeSpan.FromMilliseconds(maxMissedHeartbeats * ConnectionMultiplexer.MillisecondsPerHeartbeat));
var tmp = interactive; var tmp = interactive;
sb.Append("; int: ").Append(tmp == null ? "n/a" : tmp.ConnectionState.ToString()); sb.Append("; int: ").Append(tmp == null ? "n/a" : tmp.ConnectionState.ToString());
tmp = subscription; tmp = subscription;
...@@ -418,19 +412,6 @@ internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection, ...@@ -418,19 +412,6 @@ internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection,
} }
} }
private static int ComputeBeatsFromMilliseconds(int value)
{
if (value > 0)
{
int beats = value / ConnectionMultiplexer.MillisecondsPerHeartbeat;
if (beats == 0) beats = 1;
return beats;
}
else
{
return -1;
}
}
private PhysicalBridge CreateBridge(ConnectionType type) private PhysicalBridge CreateBridge(ConnectionType type)
{ {
multiplexer.Trace(type.ToString()); multiplexer.Trace(type.ToString());
...@@ -491,7 +472,7 @@ void Handshake(PhysicalConnection connection) ...@@ -491,7 +472,7 @@ void Handshake(PhysicalConnection connection)
var configChannel = multiplexer.ConfigurationChangedChannel; var configChannel = multiplexer.ConfigurationChangedChannel;
if(configChannel != null) if(configChannel != null)
{ {
msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.SUBSCRIBE, (RedisValue)configChannel); msg = Message.Create(-1, CommandFlags.FireAndForget, RedisCommand.SUBSCRIBE, (RedisChannel)configChannel);
WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.TrackSubscriptions); WriteDirectOrQueueFireAndForget(connection, msg, ResultProcessor.TrackSubscriptions);
} }
} }
...@@ -505,7 +486,7 @@ private void SetConfig<T>(ref T field, T value, [CallerMemberName] string caller ...@@ -505,7 +486,7 @@ private void SetConfig<T>(ref T field, T value, [CallerMemberName] string caller
{ {
multiplexer.Trace(caller + " changed from " + field + " to " + value, "Configuration"); multiplexer.Trace(caller + " changed from " + field + " to " + value, "Configuration");
field = value; field = value;
multiplexer.ReconfigureIfNeeded(endpoint, false); multiplexer.ReconfigureIfNeeded(endpoint, false, caller);
} }
} }
......
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace StackExchange.Redis
{
/// <summary>
/// A SocketManager monitors multiple sockets for availability of data; this is done using
/// the Socket.Select API and a dedicated reader-thread, which allows for fast responses
/// even when the system is under ambient load.
/// </summary>
public sealed class SocketManager : IDisposable
{
private readonly string name;
/// <summary>
/// Creates a new (optionally named) SocketManager instance
/// </summary>
public SocketManager(string name = null)
{
if (string.IsNullOrWhiteSpace(name)) name = GetType().Name;
this.name = name;
}
/// <summary>
/// Gets the name of this SocketManager instance
/// </summary>
public string Name { get { return name; } }
bool isDisposed;
private readonly Dictionary<Socket, ISocketCallback> socketLookup = new Dictionary<Socket, ISocketCallback>();
private readonly List<Socket> readQueue = new List<Socket>(), errorQueue = new List<Socket>();
/// <summary>
/// Adds a new socket and callback to the manager
/// </summary>
private void AddRead(Socket socket, ISocketCallback callback)
{
if (socket == null) throw new ArgumentNullException("socket");
if (callback == null) throw new ArgumentNullException("callback");
lock (socketLookup)
{
if (isDisposed) throw new ObjectDisposedException(name);
socketLookup.Add(socket, callback);
if (socketLookup.Count == 1)
{
Monitor.PulseAll(socketLookup);
if (Interlocked.CompareExchange(ref readerCount, 0, 0) == 0)
StartReader();
}
}
}
private void StartReader()
{
var thread = new Thread(read, 32 * 1024); // don't need a huge stack
thread.Name = name + ":Read";
thread.IsBackground = true;
thread.Priority = ThreadPriority.AboveNormal; // time critical
thread.Start(this);
}
private static ParameterizedThreadStart read = state => ((SocketManager)state).Read();
private void Read()
{
bool weAreReader = false;
try
{
weAreReader = Interlocked.CompareExchange(ref readerCount, 1, 0) == 0;
if (weAreReader) ReadImpl();
}
catch (Exception ex)
{
Debug.WriteLine(ex);
Trace.WriteLine(ex);
}
finally
{
if (weAreReader) Interlocked.Exchange(ref readerCount, 0);
}
}
private void ReadImpl()
{
List<Socket> dead = null;
while (true)
{
readQueue.Clear();
errorQueue.Clear();
lock (socketLookup)
{
if (isDisposed) return;
if (socketLookup.Count == 0)
{
// if empty, give it a few seconds chance before exiting
Monitor.Wait(socketLookup, TimeSpan.FromSeconds(20));
if (socketLookup.Count == 0) return; // nothing new came in, so exit
}
if (dead != null) dead.Clear();
foreach (var pair in socketLookup)
{
if (pair.Key.Connected)
{
readQueue.Add(pair.Key);
errorQueue.Add(pair.Key);
}
else
{
(dead ?? (dead = new List<Socket>())).Add(pair.Key);
}
}
if (dead != null && dead.Count != 0)
{
foreach (var socket in dead) socketLookup.Remove(socket);
}
}
int pollingSockets = readQueue.Count;
if (pollingSockets == 0)
{
// nobody had actual sockets; just sleep
Thread.Sleep(10);
continue;
}
try
{
Socket.Select(readQueue, null, errorQueue, 100);
ConnectionMultiplexer.TraceWithoutContext(readQueue.Count != 0, "Read sockets: " + readQueue.Count);
ConnectionMultiplexer.TraceWithoutContext(errorQueue.Count != 0, "Error sockets: " + errorQueue.Count);
}
catch (Exception ex)
{ // this typically means a socket was disposed just before
Trace.WriteLine(ex.Message);
continue;
}
int totalWork = readQueue.Count + errorQueue.Count;
if (totalWork == 0) continue;
if (totalWork >= 10) // number of sockets we should attempt to process by ourself before asking for help
{
// seek help, work in parallel, then synchronize
lock (QueueDrainSyncLock)
{
ThreadPool.QueueUserWorkItem(HelpProcessItems, this);
ProcessItems();
Monitor.Wait(QueueDrainSyncLock);
}
}
else
{
// just do it ourself
ProcessItems();
}
}
}
internal void Shutdown(SocketToken token)
{
var socket = token.Socket;
if (socket != null)
{
lock (socketLookup)
{
socketLookup.Remove(socket);
}
try { socket.Shutdown(SocketShutdown.Both); } catch { }
try { socket.Close(); } catch { }
try { socket.Dispose(); } catch { }
}
}
private readonly object QueueDrainSyncLock = new object();
static readonly WaitCallback HelpProcessItems = state =>
{
var mgr = (SocketManager)state;
mgr.ProcessItems();
lock (mgr.QueueDrainSyncLock)
{
Monitor.PulseAll(mgr.QueueDrainSyncLock);
}
};
private void ProcessItems()
{
ProcessItems(socketLookup, readQueue, CallbackOperation.Read);
ProcessItems(socketLookup, errorQueue, CallbackOperation.Error);
}
private static void ProcessItems(Dictionary<Socket, ISocketCallback> socketLookup, List<Socket> list, CallbackOperation operation)
{
if (list == null) return;
while (true)
{
// get the next item (note we could be competing with a worker here, hence lock)
Socket socket;
lock (list)
{
int index = list.Count - 1;
if (index < 0) break;
socket = list[index];
list.RemoveAt(index); // note: removing from end to avoid moving everything
}
ISocketCallback callback;
lock (socketLookup)
{
if (!socketLookup.TryGetValue(socket, out callback)) callback = null;
}
if (callback != null)
{
#if VERBOSE
var watch = Stopwatch.StartNew();
#endif
switch (operation)
{
case CallbackOperation.Read: callback.Read(); break;
case CallbackOperation.Error: callback.Error(); break;
}
#if VERBOSE
watch.Stop();
ConnectionMultiplexer.TraceWithoutContext(string.Format("{0}: {1}ms on {2}", operation, watch.ElapsedMilliseconds, callback));
#endif
}
}
}
private enum CallbackOperation
{
Read,
Error
}
private int readerCount;
/// <summary>
/// Releases all resources associated with this instance
/// </summary>
public void Dispose()
{
lock (socketLookup)
{
isDisposed = true;
socketLookup.Clear();
Monitor.PulseAll(socketLookup);
}
}
internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback)
{
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.NoDelay = true;
socket.BeginConnect(endpoint, EndConnect, Tuple.Create(socket, callback));
return new SocketToken(socket);
}
private void EndConnect(IAsyncResult ar)
{
Tuple<Socket, ISocketCallback> tuple = null;
try
{
tuple = (Tuple<Socket, ISocketCallback>)ar.AsyncState;
var socket = tuple.Item1;
var callback = tuple.Item2;
socket.EndConnect(ar);
AddRead(socket, callback);
var netStream = new NetworkStream(socket, false);
callback.Connected(netStream);
}
catch
{
if (tuple != null)
{
tuple.Item2.Error();
}
}
}
}
/// <summary>
/// Allows callbacks from SocketManager as work is discovered
/// </summary>
internal interface ISocketCallback
{
/// <summary>
/// Indicates that a socket has connected
/// </summary>
void Connected(Stream stream);
/// <summary>
/// Indicates that data is available on the socket, and that the consumer should read from the socket
/// </summary>
void Read();
/// <summary>
/// Indicates that the socket has signalled an error condition
/// </summary>
void Error();
}
internal struct SocketToken
{
internal readonly Socket Socket;
public SocketToken(Socket socket)
{
this.Socket = socket;
}
public int Available { get { return Socket == null ? 0 : Socket.Available; } }
public bool HasValue { get { return Socket != null; } }
}
}
@packages\Redis-64.2.6.12.1\tools\redis-cli.exe -h cluster -p 7000
\ No newline at end of file
@packages\Redis-64.2.6.12.1\tools\redis-cli.exe -h cluster -p 7001
\ No newline at end of file
@packages\Redis-64.2.6.12.1\tools\redis-cli.exe -h cluster -p 7002
\ No newline at end of file
@packages\Redis-64.2.6.12.1\tools\redis-cli.exe -h cluster -p 7003
\ No newline at end of file
@packages\Redis-64.2.6.12.1\tools\redis-cli.exe -h cluster -p 7004
\ No newline at end of file
@packages\Redis-64.2.6.12.1\tools\redis-cli.exe -h cluster -p 7005
\ No newline at end of file
@packages\Redis-64.2.6.12.1\tools\redis-cli.exe -p 6379
\ No newline at end of file
@packages\Redis-64.2.6.12.1\tools\redis-cli.exe -p 6381
\ No newline at end of file
@packages\Redis-64.2.6.12.1\tools\redis-cli.exe -p 6380
\ No newline at end of file
@packages\Redis-64.2.6.12.1\tools\redis-server.exe master.conf
\ No newline at end of file
@packages\Redis-64.2.6.12.1\tools\redis-server.exe secure.conf
\ No newline at end of file
@packages\Redis-64.2.6.12.1\tools\redis-server.exe slave.conf
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment