Commit e7a3485e authored by Marc Gravell's avatar Marc Gravell

fix partial interfaces; codemaid; split SocketManager.Poll.cs code out (mono vs .NET)

parent 15d1d2f4
......@@ -352,7 +352,7 @@ private void breakSocket_Click(object sender, EventArgs e)
{
try
{
((IRedisServerDebug)muxer.GetServer(pair.EndPoint)).SimulateConnectionFailure();
muxer.GetServer(pair.EndPoint).SimulateConnectionFailure();
} catch(Exception ex)
{
Log(ex.Message);
......
......@@ -24,7 +24,7 @@ public void AsyncTasksReportFailureIfServerUnavailable()
using(var conn = Create(allowAdmin: true))
{
var server = (IRedisServerDebug)conn.GetServer(PrimaryServer, PrimaryPort);
var server = conn.GetServer(PrimaryServer, PrimaryPort);
RedisKey key = Me();
var db = conn.GetDatabase();
......
......@@ -521,7 +521,7 @@ public void TestQuit(bool preserveOrder)
string key = Guid.NewGuid().ToString();
db.KeyDelete(key, CommandFlags.FireAndForget);
db.StringSet(key, key, flags: CommandFlags.FireAndForget);
((IRedisDebug)GetServer(muxer)).Quit(CommandFlags.FireAndForget);
GetServer(muxer).Quit(CommandFlags.FireAndForget);
var watch = Stopwatch.StartNew();
try
{
......@@ -550,7 +550,7 @@ public void TestSevered(bool preserveOrder)
string key = Guid.NewGuid().ToString();
db.KeyDelete(key, CommandFlags.FireAndForget);
db.StringSet(key, key, flags: CommandFlags.FireAndForget);
((IRedisServerDebug)GetServer(muxer)).SimulateConnectionFailure();
GetServer(muxer).SimulateConnectionFailure();
var watch = Stopwatch.StartNew();
db.Ping();
watch.Stop();
......
......@@ -146,19 +146,19 @@ public void IntentionalWrongServer()
#if DEBUG
string a = ((IRedisServerDebug)conn.GetServer(rightMasterNode.EndPoint)).StringGet(db.Database, key);
string a = conn.GetServer(rightMasterNode.EndPoint).StringGet(db.Database, key);
Assert.AreEqual(value, a, "right master");
var node = config.Nodes.FirstOrDefault(x => !x.IsSlave && x.NodeId != rightMasterNode.NodeId);
Assert.IsNotNull(node);
if (node != null)
{
string b = ((IRedisServerDebug)conn.GetServer(node.EndPoint)).StringGet(db.Database, key);
string b = conn.GetServer(node.EndPoint).StringGet(db.Database, key);
Assert.AreEqual(value, b, "wrong master, allow redirect");
try
{
string c = ((IRedisServerDebug)conn.GetServer(node.EndPoint)).StringGet(db.Database, key, CommandFlags.NoRedirect);
string c = conn.GetServer(node.EndPoint).StringGet(db.Database, key, CommandFlags.NoRedirect);
Assert.Fail("wrong master, no redirect");
} catch (RedisServerException ex)
{
......@@ -170,7 +170,7 @@ public void IntentionalWrongServer()
Assert.IsNotNull(node);
if (node != null)
{
string d = ((IRedisServerDebug)conn.GetServer(node.EndPoint)).StringGet(db.Database, key);
string d = conn.GetServer(node.EndPoint).StringGet(db.Database, key);
Assert.AreEqual(value, d, "right slave");
}
......@@ -178,12 +178,12 @@ public void IntentionalWrongServer()
Assert.IsNotNull(node);
if (node != null)
{
string e = ((IRedisServerDebug)conn.GetServer(node.EndPoint)).StringGet(db.Database, key);
string e = conn.GetServer(node.EndPoint).StringGet(db.Database, key);
Assert.AreEqual(value, e, "wrong slave, allow redirect");
try
{
string f = ((IRedisServerDebug)conn.GetServer(node.EndPoint)).StringGet(db.Database, key, CommandFlags.NoRedirect);
string f = conn.GetServer(node.EndPoint).StringGet(db.Database, key, CommandFlags.NoRedirect);
Assert.Fail("wrong slave, no redirect");
}
catch (RedisServerException ex)
......
......@@ -116,7 +116,7 @@ public void ClientName()
var conn = muxer.GetDatabase();
conn.Ping();
#if DEBUG
var name = ((IRedisDebug)GetServer(muxer)).ClientGetName();
var name = GetServer(muxer).ClientGetName();
Assert.AreEqual("TestRig", name);
#endif
}
......
......@@ -41,7 +41,7 @@ public void ShutdownRaisesConnectionFailedAndRestore()
#if DEBUG
conn.AllowConnect = false;
var server = (IRedisServerDebug)conn.GetServer(PrimaryServer, PrimaryPort);
var server = conn.GetServer(PrimaryServer, PrimaryPort);
SetExpectedAmbientFailureCount(2);
server.SimulateConnectionFailure();
......
......@@ -355,7 +355,7 @@ public void SubscriptionsSurviveConnectionFailure()
Assert.AreEqual(1, server.GetCounters().Subscription.SocketCount, "sockets");
#if DEBUG
((IRedisServerDebug)server).SimulateConnectionFailure();
server.SimulateConnectionFailure();
SetExpectedAmbientFailureCount(2);
#endif
......
......@@ -36,6 +36,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
ProjectSection(SolutionItems) = preProject
monobuild.bash = monobuild.bash
monobuild.cmd = monobuild.cmd
netbuild.cmd = netbuild.cmd
StackExchange.Redis.nuspec = StackExchange.Redis.nuspec
EndProjectSection
EndProject
......
......@@ -136,6 +136,9 @@
<Compile Include="StackExchange\Redis\ServerType.cs" />
<Compile Include="StackExchange\Redis\SetOperation.cs" />
<Compile Include="StackExchange\Redis\SocketManager.cs" />
<Compile Include="StackExchange\Redis\SocketManager.NoPoll.cs">
<DependentUpon>SocketManager.cs</DependentUpon>
</Compile>
<Compile Include="StackExchange\Redis\SortType.cs" />
<Compile Include="StackExchange\Redis\StringSplits.cs" />
<Compile Include="StackExchange\Redis\TaskSource.cs" />
......@@ -143,6 +146,11 @@
<Compile Include="StackExchange\Redis\ShutdownMode.cs" />
<Compile Include="StackExchange\Redis\SaveType.cs" />
</ItemGroup>
<ItemGroup>
<Compile Include="StackExchange\Redis\SocketManager.Poll.cs">
<DependentUpon>SocketManager.cs</DependentUpon>
</Compile>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
......
......@@ -16,32 +16,6 @@ public sealed class ClientInfo
/// </summary>
public EndPoint Address { get; private set; }
/// <summary>
/// The host of the client (typically an IP address)
/// </summary>
public string Host
{
get
{
string host;
int port;
return Format.TryGetHostPort(Address, out host, out port) ? host : null;
}
}
/// <summary>
/// The port of the client
/// </summary>
public int Port
{
get
{
string host;
int port;
return Format.TryGetHostPort(Address, out host, out port) ? port : 0;
}
}
/// <summary>
/// total duration of the connection in seconds
/// </summary>
......@@ -73,6 +47,19 @@ public int Port
/// </summary>
public string FlagsRaw { get; private set; }
/// <summary>
/// The host of the client (typically an IP address)
/// </summary>
public string Host
{
get
{
string host;
int port;
return Format.TryGetHostPort(Address, out host, out port) ? host : null;
}
}
/// <summary>
/// idle time of the connection in seconds
/// </summary>
......@@ -93,6 +80,18 @@ public int Port
/// </summary>
public int PatternSubscriptionCount { get; private set; }
/// <summary>
/// The port of the client
/// </summary>
public int Port
{
get
{
string host;
int port;
return Format.TryGetHostPort(Address, out host, out port) ? port : 0;
}
}
/// <summary>
/// The raw content from redis
/// </summary>
......
......@@ -41,6 +41,22 @@ private SlotRange(short from, short to)
/// </summary>
public int To { get { return to; } }
/// <summary>
/// Indicates whether two ranges are not equal
/// </summary>
public static bool operator !=(SlotRange x, SlotRange y)
{
return x.from != y.from || x.to != y.to;
}
/// <summary>
/// Indicates whether two ranges are equal
/// </summary>
public static bool operator ==(SlotRange x, SlotRange y)
{
return x.from == y.from && x.to == y.to;
}
/// <summary>
/// Try to parse a string as a range
/// </summary>
......@@ -93,22 +109,6 @@ public override bool Equals(object obj)
}
return false;
}
/// <summary>
/// Indicates whether two ranges are equal
/// </summary>
public static bool operator ==(SlotRange x, SlotRange y)
{
return x.from == y.from && x.to == y.to;
}
/// <summary>
/// Indicates whether two ranges are not equal
/// </summary>
public static bool operator !=(SlotRange x, SlotRange y)
{
return x.from != y.from || x.to != y.to;
}
/// <summary>
/// Indicates whether two ranges are equal
/// </summary>
......@@ -122,7 +122,8 @@ public bool Equals(SlotRange range)
/// </summary>
public override int GetHashCode()
{
return (int)from | ((int)to << 16);
int x = from, y = to; // makes CS0675 a little happier
return x | (y << 16);
}
/// <summary>
......
......@@ -16,6 +16,7 @@ sealed partial class CompletionManager
private readonly string name;
int activeAsyncWorkerThread = 0;
long completedSync, completedAsync, failedAsync;
public CompletionManager(ConnectionMultiplexer multiplexer, string name)
{
......@@ -110,8 +111,6 @@ private static void ProcessAsyncCompletionQueue(object state)
}
partial void OnCompletedAsync();
int activeAsyncWorkerThread = 0;
private void ProcessAsyncCompletionQueueImpl()
{
int currentThread = Environment.CurrentManagedThreadId;
......
......@@ -12,8 +12,6 @@ public abstract class Condition
private Condition() { }
internal abstract int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy);
/// <summary>
/// Enforces that the given hash-field must have the specified value
/// </summary>
......@@ -53,43 +51,44 @@ public static Condition HashNotExists(RedisKey key, RedisValue hashField)
}
/// <summary>
/// Enforces that the given key must have the specified value
/// Enforces that the given key must exist
/// </summary>
public static Condition StringEqual(RedisKey key, RedisValue value)
public static Condition KeyExists(RedisKey key)
{
if (value.IsNull) return KeyNotExists(key);
return new EqualsCondition(key, RedisValue.Null, true, value);
return new ExistsCondition(key, RedisValue.Null, true);
}
/// <summary>
/// Enforces that the given key must exist
/// Enforces that the given key must not exist
/// </summary>
public static Condition KeyExists(RedisKey key)
public static Condition KeyNotExists(RedisKey key)
{
return new ExistsCondition(key, RedisValue.Null, true);
return new ExistsCondition(key, RedisValue.Null, false);
}
/// <summary>
/// Enforces that the given key must not have the specified value
/// Enforces that the given key must have the specified value
/// </summary>
public static Condition StringNotEqual(RedisKey key, RedisValue value)
public static Condition StringEqual(RedisKey key, RedisValue value)
{
if (value.IsNull) return KeyExists(key);
return new EqualsCondition(key, RedisValue.Null, false, value);
if (value.IsNull) return KeyNotExists(key);
return new EqualsCondition(key, RedisValue.Null, true, value);
}
/// <summary>
/// Enforces that the given key must not exist
/// Enforces that the given key must not have the specified value
/// </summary>
public static Condition KeyNotExists(RedisKey key)
public static Condition StringNotEqual(RedisKey key, RedisValue value)
{
return new ExistsCondition(key, RedisValue.Null, false);
if (value.IsNull) return KeyExists(key);
return new EqualsCondition(key, RedisValue.Null, false, value);
}
internal abstract void CheckCommands(CommandMap commandMap);
internal abstract IEnumerable<Message> CreateMessages(int db, ResultBox resultBox);
internal abstract int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy);
internal abstract bool TryValidate(RawResult result, out bool value);
internal sealed class ConditionProcessor : ResultProcessor<bool>
......@@ -159,15 +158,12 @@ public ExistsCondition(RedisKey key, RedisValue hashField, bool expectedResult)
this.expectedResult = expectedResult;
}
internal override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
return serverSelectionStrategy.HashSlot(key);
}
public override string ToString()
{
return (hashField.IsNull ? key.ToString() : key + " > " + hashField)
+ (expectedResult ? " exists" : " does not exists");
}
internal override void CheckCommands(CommandMap commandMap)
{
commandMap.AssertAvailable(hashField.IsNull ? RedisCommand.EXISTS : RedisCommand.HEXISTS);
......@@ -182,6 +178,11 @@ internal override IEnumerable<Message> CreateMessages(int db, ResultBox resultBo
message.SetSource(ConditionProcessor.Default, resultBox);
yield return message;
}
internal override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
return serverSelectionStrategy.HashSlot(key);
}
internal override bool TryValidate(RawResult result, out bool value)
{
bool parsed;
......@@ -210,11 +211,6 @@ public EqualsCondition(RedisKey key, RedisValue hashField, bool expectedEqual, R
this.expectedValue = expectedValue;
}
internal override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
return serverSelectionStrategy.HashSlot(key);
}
public override string ToString()
{
return (hashField.IsNull ? key.ToString() : key + " > " + hashField)
......@@ -236,6 +232,11 @@ internal sealed override IEnumerable<Message> CreateMessages(int db, ResultBox r
message.SetSource(ConditionProcessor.Default, resultBox);
yield return message;
}
internal override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
return serverSelectionStrategy.HashSlot(key);
}
internal override bool TryValidate(RawResult result, out bool value)
{
switch (result.Type)
......
......@@ -38,62 +38,53 @@ public sealed class ConfigurationOptions : ICloneable
ConfigChannelPrefix = "configChannel=", AbortOnConnectFailPrefix = "abortConnect=", ResolveDnsPrefix = "resolveDns=",
ChannelPrefixPrefix = "channelPrefix=", ProxyPrefix = "proxy=";
private readonly EndPointCollection endpoints = new EndPointCollection();
/// <summary>
/// Automatically encodes and decodes channels
/// </summary>
public RedisChannel ChannelPrefix { get;set; }
private bool? allowAdmin, abortOnConnectFail, resolveDns;
private Proxy? proxy;
private CommandMap commandMap;
private string clientName, serviceName, password, tieBreaker, sslHost, configChannel;
private Version defaultVersion;
private int? keepAlive, syncTimeout, connectTimeout, writeBuffer;
private readonly EndPointCollection endpoints = new EndPointCollection();
private bool? allowAdmin, abortOnConnectFail, resolveDns;
private string clientName, serviceName, password, tieBreaker, sslHost, configChannel;
private CommandMap commandMap;
private Version defaultVersion;
private int? keepAlive, syncTimeout, connectTimeout, writeBuffer;
private Proxy? proxy;
/// <summary>
/// A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication; note
/// that this cannot be specified in the configuration-string.
/// </summary>
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1009:DeclareEventHandlersCorrectly")]
public event LocalCertificateSelectionCallback CertificateSelection;
public event LocalCertificateSelectionCallback CertificateSelection;
/// <summary>
/// A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party; note
/// that this cannot be specified in the configuration-string.
/// </summary>
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1009:DeclareEventHandlersCorrectly")]
public event RemoteCertificateValidationCallback CertificateValidation;
/// <summary>
/// Gets or sets the SocketManager instance to be used with these options; if this is null a per-multiplexer
/// SocketManager is created automatically.
/// </summary>
public SocketManager SocketManager { get;set; }
public event RemoteCertificateValidationCallback CertificateValidation;
/// <summary>
/// Indicates whether admin operations should be allowed
/// Gets or sets whether connect/configuration timeouts should be explicitly notified via a TimeoutException
/// </summary>
public Proxy Proxy { get { return proxy.GetValueOrDefault(); } set { proxy = value; } }
public bool AbortOnConnectFail { get { return abortOnConnectFail ?? true; } set { abortOnConnectFail = value; } }
/// <summary>
/// Indicates whether admin operations should be allowed
/// </summary>
public bool AllowAdmin { get { return allowAdmin.GetValueOrDefault(); } set { allowAdmin = value; } }
public bool AllowAdmin { get { return allowAdmin.GetValueOrDefault(); } set { allowAdmin = value; } }
/// <summary>
/// Indicates whether endpoints should be resolved via DNS before connecting
/// Automatically encodes and decodes channels
/// </summary>
public bool ResolveDns { get { return resolveDns.GetValueOrDefault(); } set { resolveDns = value; } }
public RedisChannel ChannelPrefix { get;set; }
/// <summary>
/// The client name to user for all connections
/// </summary>
public string ClientName { get { return clientName; } set { clientName = value; } }
public string ClientName { get { return clientName; } set { clientName = value; } }
/// <summary>
/// The command-map associated with this configuration
/// </summary>
......@@ -102,7 +93,7 @@ public CommandMap CommandMap
get
{
if (commandMap != null) return commandMap;
switch(Proxy)
switch (Proxy)
{
case Redis.Proxy.Twemproxy:
return CommandMap.Twemproxy;
......@@ -110,47 +101,63 @@ public CommandMap CommandMap
return CommandMap.Default;
}
}
set {
set
{
if (value == null) throw new ArgumentNullException("value");
commandMap = value;
}
}
}
/// <summary>
/// Channel to use for broadcasting and listening for configuration change notification
/// </summary>
public string ConfigurationChannel { get { return configChannel ?? DefaultConfigurationChannel; } set { configChannel = value; } }
public string ConfigurationChannel { get { return configChannel ?? DefaultConfigurationChannel; } set { configChannel = value; } }
/// <summary>
/// Specifies the time in milliseconds that should be allowed for connection
/// </summary>
public int ConnectTimeout { get { return connectTimeout ?? SyncTimeout; } set { connectTimeout = value; } }
public int ConnectTimeout { get { return connectTimeout ?? SyncTimeout; } set { connectTimeout = value; } }
/// <summary>
/// The server version to assume
/// </summary>
public Version DefaultVersion { get { return defaultVersion ?? RedisFeatures.v2_0_0; } set { defaultVersion = value; } }
public Version DefaultVersion { get { return defaultVersion ?? RedisFeatures.v2_0_0; } set { defaultVersion = value; } }
/// <summary>
/// The endpoints defined for this configuration
/// </summary>
public EndPointCollection EndPoints { get { return endpoints; } }
public EndPointCollection EndPoints { get { return endpoints; } }
/// <summary>
/// Specifies the time in seconds at which connections should be pinged to ensure validity
/// </summary>
public int KeepAlive { get { return keepAlive.GetValueOrDefault(-1); } set { keepAlive = value; } }
public int KeepAlive { get { return keepAlive.GetValueOrDefault(-1); } set { keepAlive = value; } }
/// <summary>
/// The password to use to authenticate with the server
/// </summary>
public string Password { get { return password; } set { password = value; } }
public string Password { get { return password; } set { password = value; } }
/// <summary>
/// Indicates whether admin operations should be allowed
/// </summary>
public Proxy Proxy { get { return proxy.GetValueOrDefault(); } set { proxy = value; } }
/// <summary>
/// Indicates whether endpoints should be resolved via DNS before connecting
/// </summary>
public bool ResolveDns { get { return resolveDns.GetValueOrDefault(); } set { resolveDns = value; } }
/// <summary>
/// The service name used to resolve a service via sentinel
/// </summary>
public string ServiceName { get { return serviceName; } set { serviceName = value; } }
public string ServiceName { get { return serviceName; } set { serviceName = value; } }
/// <summary>
/// Gets or sets the SocketManager instance to be used with these options; if this is null a per-multiplexer
/// SocketManager is created automatically.
/// </summary>
public SocketManager SocketManager { get;set; }
/// <summary>
/// The target-host to use when validating SSL certificate; setting a value here enables SSL mode
/// </summary>
......@@ -174,12 +181,6 @@ public CommandMap CommandMap
// these just rip out the underlying handlers, bypassing the event accessors - needed when creating the SSL stream
internal RemoteCertificateValidationCallback CertificateValidationCallback { get { return CertificateValidation; } private set { CertificateValidation = value; } }
/// <summary>
/// Gets or sets whether connect/configuration timeouts should be explicitly notified via a TimeoutException
/// </summary>
public bool AbortOnConnectFail { get { return abortOnConnectFail ?? true; } set { abortOnConnectFail = value; } }
/// <summary>
/// Parse the configuration from a comma-delimited configuration string
/// </summary>
......
using System;
using System.Text;
using System.Text;
namespace StackExchange.Redis
{
......@@ -43,41 +42,15 @@ public bool IsEmpty
}
/// <summary>
/// The number of operations performed on this connection
/// Indicates the total number of messages despatched to a non-preferred endpoint, for example sent to a master
/// when the caller stated a preference of slave
/// </summary>
public long OperationCount { get; internal set; }
public long NonPreferredEndpointCount { get; internal set; }
/// <summary>
/// The number of subscriptions (with and without patterns) currently held against this connection
/// The number of operations performed on this connection
/// </summary>
public long Subscriptions { get;internal set; }
internal void Add(ConnectionCounters other)
{
if (other == null) return;
this.CompletedAsynchronously += other.CompletedAsynchronously;
this.CompletedSynchronously += other.CompletedSynchronously;
this.FailedAsynchronously += other.FailedAsynchronously;
this.OperationCount += other.OperationCount;
this.PendingUnsentItems += other.PendingUnsentItems;
this.ResponsesAwaitingAsyncCompletion += other.ResponsesAwaitingAsyncCompletion;
this.SentItemsAwaitingResponse += other.SentItemsAwaitingResponse;
this.SocketCount += other.SocketCount;
this.Subscriptions += other.Subscriptions;
this.WriterCount += other.WriterCount;
this.NonPreferredEndpointCount += other.NonPreferredEndpointCount;
}
internal bool Any()
{
return CompletedAsynchronously != 0 || CompletedSynchronously != 0
|| FailedAsynchronously != 0 || OperationCount != 0
|| PendingUnsentItems != 0 || ResponsesAwaitingAsyncCompletion != 0
|| SentItemsAwaitingResponse != 0 || SocketCount != 0
|| Subscriptions != 0 || WriterCount != 0
|| NonPreferredEndpointCount != 0;
}
public long OperationCount { get; internal set; }
/// <summary>
/// Operations that have been requested, but which have not yet been sent to the server
......@@ -99,6 +72,11 @@ internal bool Any()
/// </summary>
public long SocketCount { get; internal set; }
/// <summary>
/// The number of subscriptions (with and without patterns) currently held against this connection
/// </summary>
public long Subscriptions { get;internal set; }
/// <summary>
/// Indicates the total number of outstanding items against this connection
/// </summary>
......@@ -108,13 +86,6 @@ internal bool Any()
/// Indicates the total number of writers items against this connection
/// </summary>
public int WriterCount { get; internal set; }
/// <summary>
/// Indicates the total number of messages despatched to a non-preferred endpoint, for example sent to a master
/// when the caller stated a preference of slave
/// </summary>
public long NonPreferredEndpointCount { get; internal set; }
/// <summary>
/// See Object.ToString()
......@@ -126,6 +97,31 @@ public override string ToString()
return sb.ToString();
}
internal void Add(ConnectionCounters other)
{
if (other == null) return;
this.CompletedAsynchronously += other.CompletedAsynchronously;
this.CompletedSynchronously += other.CompletedSynchronously;
this.FailedAsynchronously += other.FailedAsynchronously;
this.OperationCount += other.OperationCount;
this.PendingUnsentItems += other.PendingUnsentItems;
this.ResponsesAwaitingAsyncCompletion += other.ResponsesAwaitingAsyncCompletion;
this.SentItemsAwaitingResponse += other.SentItemsAwaitingResponse;
this.SocketCount += other.SocketCount;
this.Subscriptions += other.Subscriptions;
this.WriterCount += other.WriterCount;
this.NonPreferredEndpointCount += other.NonPreferredEndpointCount;
}
internal bool Any()
{
return CompletedAsynchronously != 0 || CompletedSynchronously != 0
|| FailedAsynchronously != 0 || OperationCount != 0
|| PendingUnsentItems != 0 || ResponsesAwaitingAsyncCompletion != 0
|| SentItemsAwaitingResponse != 0 || SocketCount != 0
|| Subscriptions != 0 || WriterCount != 0
|| NonPreferredEndpointCount != 0;
}
internal void Append(StringBuilder sb)
{
sb.Append("ops=").Append(OperationCount).Append(", qu=").Append(PendingUnsentItems)
......
......@@ -9,8 +9,8 @@ namespace StackExchange.Redis
/// </summary>
public sealed class ConnectionFailedEventArgs : EventArgs, ICompletable
{
private readonly EndPoint endpoint;
private readonly ConnectionType connectionType;
private readonly EndPoint endpoint;
private readonly Exception exception;
private readonly ConnectionFailureType failureType;
private readonly EventHandler<ConnectionFailedEventArgs> handler;
......@@ -26,21 +26,20 @@ internal ConnectionFailedEventArgs(EventHandler<ConnectionFailedEventArgs> handl
}
/// <summary>
/// Gets the failing server-endpoint
/// Gets the connection-type of the failing connection
/// </summary>
public EndPoint EndPoint
public ConnectionType ConnectionType
{
get { return endpoint; }
get { return connectionType; }
}
/// <summary>
/// Gets the connection-type of the failing connection
/// Gets the failing server-endpoint
/// </summary>
public ConnectionType ConnectionType
public EndPoint EndPoint
{
get { return connectionType; }
get { return endpoint; }
}
/// <summary>
/// Gets the exception if available (this can be null)
/// </summary>
......@@ -56,16 +55,16 @@ public ConnectionFailureType FailureType
{
get { return failureType; }
}
bool ICompletable.TryComplete(bool isAsync)
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
void ICompletable.AppendStormLog(StringBuilder sb)
{
sb.Append("event, connection-failed: ");
if (endpoint == null) sb.Append("n/a");
else sb.Append(Format.ToString(endpoint));
}
bool ICompletable.TryComplete(bool isAsync)
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
}
}
using System;
using System.Text;
using System.Threading;
using System.Diagnostics;
using System.IO;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using System.Runtime.CompilerServices;
using System.Collections.Generic;
namespace StackExchange.Redis
{
......@@ -25,10 +20,7 @@ public static long GetAllocationCount()
Interlocked.Increment(ref ResultBox.allocations);
}
}
/// <summary>
/// Additional IRedisServer methods for debugging
/// </summary>
public interface IRedisServerDebug : IServer
partial interface IServer
{
/// <summary>
/// Show what is in the pending (unsent) queue
......@@ -65,10 +57,7 @@ public interface IRedisServerDebug : IServer
/// <remarks>http://redis.io/commands/client-pause</remarks>
void Hang(TimeSpan duration, CommandFlags flags = CommandFlags.None);
}
/// <summary>
/// Additional IRedis methods for debugging
/// </summary>
public interface IRedisDebug : IRedis, IRedisDebugAsync
partial interface IRedis
{
/// <summary>
/// The CLIENT GETNAME returns the name of the current connection as set by CLIENT SETNAME. Since every new connection starts without an associated name, if no name was assigned a null string is returned.
......@@ -83,10 +72,8 @@ public interface IRedisDebug : IRedis, IRedisDebugAsync
/// <remarks>http://redis.io/commands/quit</remarks>
void Quit(CommandFlags flags = CommandFlags.None);
}
/// <summary>
/// Additional IRedisAsync methods for debugging
/// </summary>
public interface IRedisDebugAsync : IRedisAsync
partial interface IRedisAsync
{
/// <summary>
/// The CLIENT GETNAME returns the name of the current connection as set by CLIENT SETNAME. Since every new connection starts without an associated name, if no name was assigned a null string is returned.
......@@ -95,15 +82,15 @@ public interface IRedisDebugAsync : IRedisAsync
/// <returns>The connection name, or a null string if no name is set.</returns>
Task<string> ClientGetNameAsync(CommandFlags flags = CommandFlags.None);
}
partial class RedisBase : IRedisDebug
partial class RedisBase
{
string IRedisDebug.ClientGetName(CommandFlags flags)
string IRedis.ClientGetName(CommandFlags flags)
{
var msg = Message.Create(-1, flags, RedisCommand.CLIENT, RedisLiterals.GETNAME);
return ExecuteSync(msg, ResultProcessor.String);
}
Task<string> IRedisDebugAsync.ClientGetNameAsync(CommandFlags flags)
Task<string> IRedisAsync.ClientGetNameAsync(CommandFlags flags)
{
var msg = Message.Create(-1, flags, RedisCommand.CLIENT, RedisLiterals.GETNAME);
return ExecuteAsync(msg, ResultProcessor.String);
......@@ -130,23 +117,23 @@ internal string ListPending(int maxCount)
}
}
partial class RedisServer : IRedisServerDebug
partial class RedisServer
{
void IRedisServerDebug.SimulateConnectionFailure()
void IServer.SimulateConnectionFailure()
{
server.SimulateConnectionFailure();
}
string IRedisServerDebug.ListPending(int maxCount)
string IServer.ListPending(int maxCount)
{
return server.ListPending(maxCount);
}
void IRedisServerDebug.Crash()
void IServer.Crash()
{
// using DB-0 because we also use "DEBUG OBJECT", which is db-centric
var msg = Message.Create(0, CommandFlags.FireAndForget, RedisCommand.DEBUG, RedisLiterals.SEGFAULT);
ExecuteSync(msg, ResultProcessor.DemandOK);
}
void IRedisServerDebug.Hang(TimeSpan duration, CommandFlags flags)
void IServer.Hang(TimeSpan duration, CommandFlags flags)
{
var msg = Message.Create(0, flags, RedisCommand.CLIENT, RedisLiterals.PAUSE, (long)duration.TotalMilliseconds);
ExecuteSync(msg, ResultProcessor.DemandOK);
......
......@@ -9,13 +9,6 @@ namespace StackExchange.Redis
/// </summary>
public sealed class EndPointCollection : Collection<EndPoint>
{
/// <summary>
/// Attempt to parse a string into an EndPoint
/// </summary>
public static EndPoint TryParse(string endpoint)
{
return Format.TryParseEndPoint(endpoint);
}
/// <summary>
/// Format an endpoint
/// </summary>
......@@ -24,6 +17,13 @@ public static string ToString(EndPoint endpoint)
return Format.ToString(endpoint);
}
/// <summary>
/// Attempt to parse a string into an EndPoint
/// </summary>
public static EndPoint TryParse(string endpoint)
{
return Format.TryParseEndPoint(endpoint);
}
/// <summary>
/// Adds a new endpoint to the list
/// </summary>
......
......@@ -26,10 +26,6 @@ public EndPoint EndPoint
{
get { return endpoint; }
}
bool ICompletable.TryComplete(bool isAsync)
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
void ICompletable.AppendStormLog(StringBuilder sb)
{
sb.Append("event, endpoint: ");
......@@ -37,5 +33,9 @@ void ICompletable.AppendStormLog(StringBuilder sb)
else sb.Append(Format.ToString(endpoint));
}
bool ICompletable.TryComplete(bool isAsync)
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
}
}
\ No newline at end of file
......@@ -4,6 +4,9 @@ namespace StackExchange.Redis
{
internal static class ExceptionFactory
{
const string DataCommandKey = "redis-command",
DataServerKey = "redis-server";
internal static Exception AdminModeNotEnabled(bool includeDetail, RedisCommand command, Message message, ServerEndPoint server)
{
string s = GetLabel(includeDetail, command, message);
......@@ -11,42 +14,26 @@ internal static Exception AdminModeNotEnabled(bool includeDetail, RedisCommand c
if (includeDetail) AddDetail(ex, message, server, s);
return ex;
}
static string GetLabel(bool includeDetail, RedisCommand command, Message message)
{
return message == null ? command.ToString() : (includeDetail ? message.CommandAndKey : message.Command.ToString());
}
internal static Exception NoConnectionAvailable(bool includeDetail, RedisCommand command, Message message, ServerEndPoint server)
internal static Exception CommandDisabled(bool includeDetail, RedisCommand command, Message message, ServerEndPoint server)
{
string s = GetLabel(includeDetail, command, message);
var ex = new RedisConnectionException(ConnectionFailureType.UnableToResolvePhysicalConnection, "No connection is available to service this operation: " + s);
var ex = new RedisCommandException("This operation has been disabled in the command-map and cannot be used: " + s);
if (includeDetail) AddDetail(ex, message, server, s);
return ex;
}
const string DataCommandKey = "redis-command",
DataServerKey = "redis-server";
private static void AddDetail(Exception exception, Message message, ServerEndPoint server, string label)
{
if (exception != null)
{
if (message != null) exception.Data.Add(DataCommandKey, message.CommandAndKey);
else if(label != null) exception.Data.Add(DataCommandKey, label);
if (server != null) exception.Data.Add(DataServerKey, Format.ToString(server.EndPoint));
}
}
internal static Exception CommandDisabled(bool includeDetail, RedisCommand command, Message message, ServerEndPoint server)
internal static Exception ConnectionFailure(bool includeDetail, ConnectionFailureType failureType, string message, ServerEndPoint server)
{
string s = GetLabel(includeDetail, command, message);
var ex = new RedisCommandException("This operation has been disabled in the command-map and cannot be used: " + s);
if (includeDetail) AddDetail(ex, message, server, s);
var ex = new RedisConnectionException(failureType, message);
if (includeDetail) AddDetail(ex, null, server, null);
return ex;
}
internal static Exception MultiSlot(bool includeDetail, Message message)
internal static Exception DatabaseNotRequired(bool includeDetail, RedisCommand command)
{
var ex = new RedisCommandException("Multi-key operations must involve a single slot; keys can use 'hash tags' to help this, i.e. '{/users/12345}/account' and '{/users/12345}/contacts' will always be in the same slot");
if (includeDetail) AddDetail(ex, message, null, null);
string s = command.ToString();
var ex = new RedisCommandException("A target database is not required for " + s);
if (includeDetail) AddDetail(ex, null, null, s);
return ex;
}
......@@ -65,14 +52,6 @@ internal static Exception DatabaseRequired(bool includeDetail, RedisCommand comm
return ex;
}
internal static Exception DatabaseNotRequired(bool includeDetail, RedisCommand command)
{
string s = command.ToString();
var ex = new RedisCommandException("A target database is not required for " + s);
if (includeDetail) AddDetail(ex, null, null, s);
return ex;
}
internal static Exception MasterOnly(bool includeDetail, RedisCommand command, Message message, ServerEndPoint server)
{
string s = GetLabel(includeDetail, command, message);
......@@ -81,17 +60,18 @@ internal static Exception MasterOnly(bool includeDetail, RedisCommand command, M
return ex;
}
internal static Exception Timeout(bool includeDetail, string errorMessage, Message message, ServerEndPoint server)
internal static Exception MultiSlot(bool includeDetail, Message message)
{
var ex = new TimeoutException(errorMessage);
if (includeDetail) AddDetail(ex, message, server, null);
var ex = new RedisCommandException("Multi-key operations must involve a single slot; keys can use 'hash tags' to help this, i.e. '{/users/12345}/account' and '{/users/12345}/contacts' will always be in the same slot");
if (includeDetail) AddDetail(ex, message, null, null);
return ex;
}
internal static Exception ConnectionFailure(bool includeDetail, ConnectionFailureType failureType, string message, ServerEndPoint server)
internal static Exception NoConnectionAvailable(bool includeDetail, RedisCommand command, Message message, ServerEndPoint server)
{
var ex = new RedisConnectionException(failureType, message);
if (includeDetail) AddDetail(ex, null, server, null);
string s = GetLabel(includeDetail, command, message);
var ex = new RedisConnectionException(ConnectionFailureType.UnableToResolvePhysicalConnection, "No connection is available to service this operation: " + s);
if (includeDetail) AddDetail(ex, message, server, s);
return ex;
}
......@@ -102,5 +82,28 @@ internal static Exception NotSupported(bool includeDetail, RedisCommand command)
if (includeDetail) AddDetail(ex, null, null, s);
return ex;
}
internal static Exception Timeout(bool includeDetail, string errorMessage, Message message, ServerEndPoint server)
{
var ex = new TimeoutException(errorMessage);
if (includeDetail) AddDetail(ex, message, server, null);
return ex;
}
private static void AddDetail(Exception exception, Message message, ServerEndPoint server, string label)
{
if (exception != null)
{
if (message != null) exception.Data.Add(DataCommandKey, message.CommandAndKey);
else if (label != null) exception.Data.Add(DataCommandKey, label);
if (server != null) exception.Data.Add(DataServerKey, Format.ToString(server.EndPoint));
}
}
static string GetLabel(bool includeDetail, RedisCommand command, Message message)
{
return message == null ? command.ToString() : (includeDetail ? message.CommandAndKey : message.Command.ToString());
}
}
}
......@@ -6,14 +6,16 @@ namespace StackExchange.Redis
{
internal static class Format
{
public static bool TryParseInt32(string s, out int value)
{
return int.TryParse(s, NumberStyles.Integer, NumberFormatInfo.InvariantInfo, out value);
}
public static int ParseInt32(string s)
{
return int.Parse(s, NumberStyles.Integer, NumberFormatInfo.InvariantInfo);
}
public static string ToString(int value)
{
return value.ToString(NumberFormatInfo.InvariantInfo);
}
public static bool TryParseBoolean(string s, out bool value)
{
if (bool.TryParse(s, out value)) return true;
......@@ -32,39 +34,20 @@ public static bool TryParseBoolean(string s, out bool value)
return false;
}
public static string ToString(int value)
public static bool TryParseInt32(string s, out int value)
{
return value.ToString(NumberFormatInfo.InvariantInfo);
return int.TryParse(s, NumberStyles.Integer, NumberFormatInfo.InvariantInfo, out value);
}
internal static string ToString(long value)
internal static EndPoint ParseEndPoint(string host, int port)
{
return value.ToString(NumberFormatInfo.InvariantInfo);
IPAddress ip;
if (IPAddress.TryParse(host, out ip)) return new IPEndPoint(ip, port);
return new DnsEndPoint(host, port);
}
internal static bool TryParseDouble(string s, out double value)
internal static string ToString(long value)
{
if(s == null || s.Length == 0)
{
value = 0;
return false;
}
if(s.Length==1 && s[0] >= '0' && s[1] <= '9')
{
value = (int)(s[0] - '0');
return true;
}
// need to handle these
if(string.Equals("+inf", s, StringComparison.OrdinalIgnoreCase))
{
value = double.PositiveInfinity;
return true;
}
if(string.Equals("-inf", s, StringComparison.OrdinalIgnoreCase))
{
value = double.NegativeInfinity;
return true;
}
return double.TryParse(s, NumberStyles.Any, NumberFormatInfo.InvariantInfo, out value);
return value.ToString(NumberFormatInfo.InvariantInfo);
}
internal static string ToString(double value)
......@@ -76,6 +59,7 @@ internal static string ToString(double value)
}
return value.ToString("G17", NumberFormatInfo.InvariantInfo);
}
internal static string ToString(object value)
{
return Convert.ToString(value, CultureInfo.InvariantCulture);
......@@ -119,11 +103,31 @@ internal static bool TryGetHostPort(EndPoint endpoint, out string host, out int
port = 0;
return false;
}
internal static EndPoint ParseEndPoint(string host, int port)
internal static bool TryParseDouble(string s, out double value)
{
IPAddress ip;
if (IPAddress.TryParse(host, out ip)) return new IPEndPoint(ip, port);
return new DnsEndPoint(host, port);
if(s == null || s.Length == 0)
{
value = 0;
return false;
}
if(s.Length==1 && s[0] >= '0' && s[1] <= '9')
{
value = (int)(s[0] - '0');
return true;
}
// need to handle these
if(string.Equals("+inf", s, StringComparison.OrdinalIgnoreCase))
{
value = double.PositiveInfinity;
return true;
}
if(string.Equals("-inf", s, StringComparison.OrdinalIgnoreCase))
{
value = double.NegativeInfinity;
return true;
}
return double.TryParse(s, NumberStyles.Any, NumberFormatInfo.InvariantInfo, out value);
}
internal static EndPoint TryParseEndPoint(string endpoint)
{
......
......@@ -4,7 +4,8 @@ namespace StackExchange.Redis
{
interface ICompletable
{
bool TryComplete(bool isAsync);
void AppendStormLog(StringBuilder sb);
bool TryComplete(bool isAsync);
}
}
......@@ -125,6 +125,13 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <remarks>http://redis.io/commands/hlen</remarks>
long HashLength(RedisKey key, CommandFlags flags = CommandFlags.None);
/// <summary>
/// The HSCAN command is used to incrementally iterate over a hash
/// </summary>
/// <returns>yields all elements of the hash.</returns>
/// <remarks>http://redis.io/commands/hscan</remarks>
IEnumerable<KeyValuePair<RedisValue, RedisValue>> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.ScanUtils.DefaultPageSize, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Sets the specified fields to their respective values in the hash stored at key. This command overwrites any existing fields in the hash. If key does not exist, a new key holding a hash is created.
/// </summary>
......@@ -214,6 +221,13 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <remarks>http://redis.io/commands/persist</remarks>
bool KeyPersist(RedisKey key, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Return a random key from the currently selected database.
/// </summary>
/// <returns>the random key, or nil when the database is empty.</returns>
/// <remarks>http://redis.io/commands/randomkey</remarks>
RedisKey KeyRandom(CommandFlags flags = CommandFlags.None);
/// <summary>
/// Renames key to newkey. It returns an error when the source and destination names are the same, or when key does not exist.
/// </summary>
......@@ -376,14 +390,6 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// Takes a lock (specifying a token value) if it is not already taken
/// </summary>
bool LockTake(RedisKey key, RedisValue value, TimeSpan expiry, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Return a random key from the currently selected database.
/// </summary>
/// <returns>the random key, or nil when the database is empty.</returns>
/// <remarks>http://redis.io/commands/randomkey</remarks>
RedisKey KeyRandom(CommandFlags flags = CommandFlags.None);
/// <summary>
/// Execute a Lua script against the server
/// </summary>
......@@ -511,21 +517,7 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// </summary>
/// <returns>yields all elements of the set.</returns>
/// <remarks>http://redis.io/commands/sscan</remarks>
IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.ScanIterator.DefaultPageSize, CommandFlags flags = CommandFlags.None);
/// <summary>
/// The ZSCAN command is used to incrementally iterate over a sorted set
/// </summary>
/// <returns>yields all elements of the sorted set.</returns>
/// <remarks>http://redis.io/commands/zscan</remarks>
IEnumerable<KeyValuePair<RedisValue, double>> SortedSetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.ScanIterator.DefaultPageSize, CommandFlags flags = CommandFlags.None);
/// <summary>
/// The HSCAN command is used to incrementally iterate over a hash
/// </summary>
/// <returns>yields all elements of the hash.</returns>
/// <remarks>http://redis.io/commands/hscan</remarks>
IEnumerable<KeyValuePair<RedisValue, RedisValue>> HashScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.ScanIterator.DefaultPageSize, CommandFlags flags = CommandFlags.None);
IEnumerable<RedisValue> SetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.ScanUtils.DefaultPageSize, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Sorts a list, set or sorted set (numerically or alphabetically, ascending by default); By default, the elements themselves are compared, but the values can also be
......@@ -603,6 +595,7 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <returns>the cardinality (number of elements) of the sorted set, or 0 if key does not exist.</returns>
/// <remarks>http://redis.io/commands/zcard</remarks>
long SortedSetLength(RedisKey key, double min = double.NegativeInfinity, double max = double.PositiveInfinity, Exclude exclude = Exclude.None, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Returns the specified range of elements in the sorted set stored at key. By default the elements are considered to be ordered from the lowest to the highest score. Lexicographical order is used for elements with equal score.
/// Both start and stop are zero-based indexes, where 0 is the first element, 1 is the next element and so on. They can also be negative numbers indicating offsets from the end of the sorted set, with -1 being the last element of the sorted set, -2 the penultimate element and so on.
......@@ -612,7 +605,6 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <remarks>http://redis.io/commands/zrevrange</remarks>
RedisValue[] SortedSetRangeByRank(RedisKey key, long start = 0, long stop = -1, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Returns the specified range of elements in the sorted set stored at key. By default the elements are considered to be ordered from the lowest to the highest score. Lexicographical order is used for elements with equal score.
/// Both start and stop are zero-based indexes, where 0 is the first element, 1 is the next element and so on. They can also be negative numbers indicating offsets from the end of the sorted set, with -1 being the last element of the sorted set, -2 the penultimate element and so on.
......@@ -682,6 +674,12 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <remarks>http://redis.io/commands/zremrangebyscore</remarks>
long SortedSetRemoveRangeByScore(RedisKey key, double start, double stop, Exclude exclude = Exclude.None, CommandFlags flags = CommandFlags.None);
/// <summary>
/// The ZSCAN command is used to incrementally iterate over a sorted set
/// </summary>
/// <returns>yields all elements of the sorted set.</returns>
/// <remarks>http://redis.io/commands/zscan</remarks>
IEnumerable<KeyValuePair<RedisValue, double>> SortedSetScan(RedisKey key, RedisValue pattern = default(RedisValue), int pageSize = RedisDatabase.ScanUtils.DefaultPageSize, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Returns the score of member in the sorted set at key; If member does not exist in the sorted set, or key does not exist, nil is returned.
/// </summary>
......
......@@ -200,6 +200,13 @@ public interface IDatabaseAsync : IRedisAsync
/// <remarks>http://redis.io/commands/persist</remarks>
Task<bool> KeyPersistAsync(RedisKey key, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Return a random key from the currently selected database.
/// </summary>
/// <returns>the random key, or nil when the database is empty.</returns>
/// <remarks>http://redis.io/commands/randomkey</remarks>
Task<RedisKey> KeyRandomAsync(CommandFlags flags = CommandFlags.None);
/// <summary>
/// Renames key to newkey. It returns an error when the source and destination names are the same, or when key does not exist.
/// </summary>
......@@ -362,17 +369,6 @@ public interface IDatabaseAsync : IRedisAsync
/// Takes a lock (specifying a token value) if it is not already taken
/// </summary>
Task<bool> LockTakeAsync(RedisKey key, RedisValue value, TimeSpan expiry, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Return a random key from the currently selected database.
/// </summary>
/// <returns>the random key, or nil when the database is empty.</returns>
/// <remarks>http://redis.io/commands/randomkey</remarks>
Task<RedisKey> KeyRandomAsync(CommandFlags flags = CommandFlags.None);
/// <summary>
/// Execute a Lua script against the server
/// </summary>
......
......@@ -6,7 +6,7 @@ namespace StackExchange.Redis
/// <summary>
/// Common operations available to all redis connections
/// </summary>
public interface IRedis : IRedisAsync
public partial interface IRedis : IRedisAsync
{
/// <summary>
/// This command is often used to test if a connection is still alive, or to measure latency.
......
......@@ -6,14 +6,24 @@ namespace StackExchange.Redis
/// <summary>
/// Common operations available to all redis connections
/// </summary>
public interface IRedisAsync
public partial interface IRedisAsync
{
/// <summary>
/// Gets the multiplexer that created this instance
/// </summary>
ConnectionMultiplexer Multiplexer { get; }
/// <summary>
/// This command is often used to test if a connection is still alive, or to measure latency.
/// </summary>
/// <returns>The observed latency.</returns>
/// <remarks>http://redis.io/commands/ping</remarks>
Task<TimeSpan> PingAsync(CommandFlags flags = CommandFlags.None);
/// <summary>
/// Wait for a given asynchronous operation to complete (or timeout), reporting which
/// </summary>
bool TryWait(Task task);
/// <summary>
/// Wait for a given asynchronous operation to complete (or timeout)
/// </summary>
......@@ -27,15 +37,5 @@ public interface IRedisAsync
/// </summary>
void WaitAll(params Task[] tasks);
/// <summary>
/// Gets the multiplexer that created this instance
/// </summary>
ConnectionMultiplexer Multiplexer { get; }
/// <summary>
/// Wait for a given asynchronous operation to complete (or timeout), reporting which
/// </summary>
bool TryWait(Task task);
}
}
......@@ -10,7 +10,7 @@ namespace StackExchange.Redis
/// <summary>
/// Provides configuration controls of a redis server
/// </summary>
public interface IServer : IRedis
public partial interface IServer : IRedis
{
/// <summary>
/// Gets the cluster configuration associated with this server, if known
......
......@@ -10,6 +10,18 @@ namespace StackExchange.Redis
public interface ISubscriber : IRedis
{
/// <summary>
/// Inidicate exactly which redis server we are talking to
/// </summary>
[IgnoreNamePrefix]
EndPoint IdentifyEndpoint(RedisChannel channel, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Inidicate exactly which redis server we are talking to
/// </summary>
[IgnoreNamePrefix]
Task<EndPoint> IdentifyEndpointAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Indicates whether the instance can communicate with the server;
/// if a channel is specified, the existing subscription map is queried to
......@@ -35,22 +47,21 @@ public interface ISubscriber : IRedis
/// </summary>
/// <remarks>http://redis.io/commands/subscribe</remarks>
/// <remarks>http://redis.io/commands/psubscribe</remarks>
Task SubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None);
void Subscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Unsubscribe from a specified message channel; note; if no handler is specified, the subscription is cancelled regardless
/// of the subscribers; if a handler is specified, the subscription is only cancelled if this handler is the
/// last handler remaining against the channel
/// </summary>
/// <remarks>http://redis.io/commands/unsubscribe</remarks>
/// <remarks>http://redis.io/commands/punsubscribe</remarks>
Task UnsubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler = null, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Subscribe to perform some operation when a change to the preferred/active node is broadcast.
/// </summary>
/// <remarks>http://redis.io/commands/subscribe</remarks>
/// <remarks>http://redis.io/commands/psubscribe</remarks>
void Subscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None);
Task SubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Inidicate to which redis server we are actively subscribed for a given channel; returns null if
/// the channel is not actively subscribed
/// </summary>
[IgnoreNamePrefix]
EndPoint SubscribedEndpoint(RedisChannel channel);
/// <summary>
/// Unsubscribe from a specified message channel; note; if no handler is specified, the subscription is cancelled regardless
......@@ -76,23 +87,12 @@ public interface ISubscriber : IRedis
Task UnsubscribeAllAsync(CommandFlags flags = CommandFlags.None);
/// <summary>
/// Inidicate exactly which redis server we are talking to
/// </summary>
[IgnoreNamePrefix]
EndPoint IdentifyEndpoint(RedisChannel channel, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Inidicate exactly which redis server we are talking to
/// </summary>
[IgnoreNamePrefix]
Task<EndPoint> IdentifyEndpointAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Inidicate to which redis server we are actively subscribed for a given channel; returns null if
/// the channel is not actively subscribed
/// Unsubscribe from a specified message channel; note; if no handler is specified, the subscription is cancelled regardless
/// of the subscribers; if a handler is specified, the subscription is only cancelled if this handler is the
/// last handler remaining against the channel
/// </summary>
[IgnoreNamePrefix]
EndPoint SubscribedEndpoint(RedisChannel channel);
/// <remarks>http://redis.io/commands/unsubscribe</remarks>
/// <remarks>http://redis.io/commands/punsubscribe</remarks>
Task UnsubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler = null, CommandFlags flags = CommandFlags.None);
}
}
\ No newline at end of file
......@@ -9,13 +9,12 @@ namespace StackExchange.Redis
/// </summary>
public class InternalErrorEventArgs : EventArgs, ICompletable
{
private readonly object sender;
private readonly ConnectionType connectionType;
private readonly EndPoint endpoint;
private readonly Exception exception;
private readonly EventHandler<InternalErrorEventArgs> handler;
private readonly string origin;
private readonly object sender;
internal InternalErrorEventArgs(EventHandler<InternalErrorEventArgs> handler, object sender, EndPoint endpoint, ConnectionType connectionType, Exception exception, string origin)
{
this.handler = handler;
......@@ -26,21 +25,20 @@ internal InternalErrorEventArgs(EventHandler<InternalErrorEventArgs> handler, ob
this.origin = origin;
}
/// <summary>
/// Gets the failing server-endpoint (this can be null)
/// Gets the connection-type of the failing connection
/// </summary>
public EndPoint EndPoint
public ConnectionType ConnectionType
{
get { return endpoint; }
get { return connectionType; }
}
/// <summary>
/// Gets the connection-type of the failing connection
/// Gets the failing server-endpoint (this can be null)
/// </summary>
public ConnectionType ConnectionType
public EndPoint EndPoint
{
get { return connectionType; }
get { return endpoint; }
}
/// <summary>
/// Gets the exception if available (this can be null)
/// </summary>
......@@ -56,15 +54,15 @@ public string Origin
{
get { return origin; }
}
bool ICompletable.TryComplete(bool isAsync)
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
void ICompletable.AppendStormLog(StringBuilder sb)
{
sb.Append("event, internal-error: ").Append(origin);
if (endpoint != null) sb.Append(", ").Append(Format.ToString(endpoint));
}
bool ICompletable.TryComplete(bool isAsync)
{
return ConnectionMultiplexer.TryCompleteHandler(handler, sender, this, isAsync);
}
}
}
\ No newline at end of file
using System;
using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices;
using System.Text;
namespace StackExchange.Redis
namespace StackExchange.Redis
{
#if LOGOUTPUT
sealed class LoggingTextStream : Stream
......@@ -144,4 +138,4 @@ public override void Write(byte[] buffer, int offset, int count)
}
}
#endif
}
}
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Text;
namespace StackExchange.Redis
......@@ -10,6 +9,8 @@ sealed partial class MessageQueue
regular = new Queue<Message>(),
high = new Queue<Message>();
public object SyncLock { get { return regular; } }
public Message Dequeue()
{
lock (regular)
......@@ -26,23 +27,6 @@ public Message Dequeue()
return null;
}
internal Message[] DequeueAll()
{
lock (regular)
{
int count = high.Count + regular.Count;
if (count == 0) return Message.EmptyArray;
var arr = new Message[count];
high.CopyTo(arr, 0);
regular.CopyTo(arr, high.Count);
high.Clear();
regular.Clear();
return arr;
}
}
public object SyncLock { get { return regular; } }
public Message PeekPing(out int queueLength)
{
lock (regular)
......@@ -70,6 +54,14 @@ public bool Push(Message message)
}
}
internal bool Any()
{
lock (regular)
{
return high.Count != 0 || regular.Count != 0;
}
}
internal int Count()
{
lock (regular)
......@@ -78,14 +70,21 @@ internal int Count()
}
}
internal bool Any()
internal Message[] DequeueAll()
{
lock(regular)
lock (regular)
{
return high.Count != 0 || regular.Count != 0;
int count = high.Count + regular.Count;
if (count == 0) return Message.EmptyArray;
var arr = new Message[count];
high.CopyTo(arr, 0);
regular.CopyTo(arr, high.Count);
high.Clear();
regular.Clear();
return arr;
}
}
internal void GetStormLog(StringBuilder sb)
{
lock(regular)
......
......@@ -45,6 +45,8 @@ public RawResult(RawResult[] arr)
public bool IsError { get { return resultType == ResultType.Error; } }
public ResultType Type { get { return resultType; } }
internal bool IsNull { get { return arr == null; } }
public override string ToString()
{
if (arr == null)
......@@ -65,32 +67,20 @@ public override string ToString()
return "(unknown)";
}
}
internal RedisKey AsRedisKey()
{
switch (resultType)
{
case ResultType.SimpleString:
case ResultType.BulkString:
return (RedisKey)GetBlob();
default:
throw new InvalidCastException("Cannot convert to RedisKey: " + resultType);
}
}
internal RedisChannel AsRedisChannel(byte[] channelPrefix)
{
switch(resultType)
switch (resultType)
{
case ResultType.SimpleString:
case ResultType.BulkString:
if(channelPrefix == null)
if (channelPrefix == null)
{
return (RedisChannel)GetBlob();
}
if(AssertStarts(channelPrefix))
if (AssertStarts(channelPrefix))
{
var src = (byte[])arr;
byte[] copy = new byte[count - channelPrefix.Length];
Buffer.BlockCopy(src, offset + channelPrefix.Length, copy, 0, copy.Length);
return (RedisChannel)copy;
......@@ -101,6 +91,17 @@ internal RedisChannel AsRedisChannel(byte[] channelPrefix)
}
}
internal RedisKey AsRedisKey()
{
switch (resultType)
{
case ResultType.SimpleString:
case ResultType.BulkString:
return (RedisKey)GetBlob();
default:
throw new InvalidCastException("Cannot convert to RedisKey: " + resultType);
}
}
internal RedisValue AsRedisValue()
{
switch (resultType)
......@@ -156,7 +157,6 @@ internal bool AssertStarts(byte[] expected)
}
return true;
}
internal bool IsNull { get { return arr == null; } }
internal byte[] GetBlob()
{
var src = (byte[])arr;
......@@ -182,16 +182,6 @@ internal bool GetBoolean()
}
}
internal bool TryGetInt64(out long value)
{
if (arr == null)
{
value = 0;
return false;
}
return RedisValue.TryParseInt64(arr as byte[], offset, count, out value);
}
internal RawResult[] GetItems()
{
return (RawResult[])arr;
......@@ -251,19 +241,29 @@ internal string GetString()
internal bool TryGetDouble(out double val)
{
if(arr == null)
if (arr == null)
{
val = 0;
return false;
}
long i64;
if(TryGetInt64(out i64))
if (TryGetInt64(out i64))
{
val = i64;
return true;
}
return Format.TryParseDouble(GetString(), out val);
}
internal bool TryGetInt64(out long value)
{
if (arr == null)
{
value = 0;
return false;
}
return RedisValue.TryParseInt64(arr as byte[], offset, count, out value);
}
}
}
......@@ -9,27 +9,13 @@ internal abstract partial class RedisBase : IRedis
internal readonly ConnectionMultiplexer multiplexer;
protected readonly object asyncState;
ConnectionMultiplexer IRedisAsync.Multiplexer { get { return multiplexer; } }
internal RedisBase(ConnectionMultiplexer multiplexer, object asyncState)
{
this.multiplexer = multiplexer;
this.asyncState = asyncState;
}
private ResultProcessor.TimingProcessor.TimerMessage GetTimerMessage(CommandFlags flags)
{
// do the best we can with available commands
var map = multiplexer.CommandMap;
if(map.IsAvailable(RedisCommand.PING))
return ResultProcessor.TimingProcessor.CreateMessage(-1, flags, RedisCommand.PING);
if(map.IsAvailable(RedisCommand.TIME))
return ResultProcessor.TimingProcessor.CreateMessage(-1, flags, RedisCommand.TIME);
if (map.IsAvailable(RedisCommand.ECHO))
return ResultProcessor.TimingProcessor.CreateMessage(-1, flags, RedisCommand.ECHO, RedisLiterals.PING);
// as our fallback, we'll do something odd... we'll treat a key like a value, out of sheer desperation
// note: this usually means: twemproxy - in which case we're fine anyway, since the proxy does the routing
return ResultProcessor.TimingProcessor.CreateMessage(0, flags, RedisCommand.EXISTS, (RedisValue)multiplexer.UniqueId);
}
ConnectionMultiplexer IRedisAsync.Multiplexer { get { return multiplexer; } }
public virtual TimeSpan Ping(CommandFlags flags = CommandFlags.None)
{
var msg = GetTimerMessage(flags);
......@@ -59,15 +45,16 @@ public override string ToString()
return multiplexer.ToString();
}
public void Wait(Task task)
{
multiplexer.Wait(task);
}
public bool TryWait(Task task)
{
return task.Wait(multiplexer.TimeoutMilliseconds);
}
public void Wait(Task task)
{
multiplexer.Wait(task);
}
public T Wait<T>(Task<T> task)
{
return multiplexer.Wait(task);
......@@ -126,7 +113,7 @@ protected void WhenAlwaysOrExistsOrNotExists(When when)
protected void WhenAlwaysOrNotExists(When when)
{
switch(when)
switch (when)
{
case When.Always:
case When.NotExists:
......@@ -135,5 +122,20 @@ protected void WhenAlwaysOrNotExists(When when)
throw new ArgumentException(when + " is not valid in this context; the permitted values are: Always, NotExists");
}
}
private ResultProcessor.TimingProcessor.TimerMessage GetTimerMessage(CommandFlags flags)
{
// do the best we can with available commands
var map = multiplexer.CommandMap;
if(map.IsAvailable(RedisCommand.PING))
return ResultProcessor.TimingProcessor.CreateMessage(-1, flags, RedisCommand.PING);
if(map.IsAvailable(RedisCommand.TIME))
return ResultProcessor.TimingProcessor.CreateMessage(-1, flags, RedisCommand.TIME);
if (map.IsAvailable(RedisCommand.ECHO))
return ResultProcessor.TimingProcessor.CreateMessage(-1, flags, RedisCommand.ECHO, RedisLiterals.PING);
// as our fallback, we'll do something odd... we'll treat a key like a value, out of sheer desperation
// note: this usually means: twemproxy - in which case we're fine anyway, since the proxy does the routing
return ResultProcessor.TimingProcessor.CreateMessage(0, flags, RedisCommand.EXISTS, (RedisValue)multiplexer.UniqueId);
}
}
}
......@@ -9,6 +9,15 @@ namespace StackExchange.Redis
public struct RedisChannel : IEquatable<RedisChannel>
{
internal static readonly RedisChannel[] EmptyArray = new RedisChannel[0];
private readonly byte[] value;
private RedisChannel(byte[] value)
{
this.value = value;
}
/// <summary>
/// Indicates whether the channel-name is either null or a zero-length value
/// </summary>
......@@ -20,33 +29,6 @@ public bool IsNullOrEmpty
}
}
internal RedisChannel Clone()
{
byte[] clone = value == null ? null : (byte[])value.Clone();
return clone;
}
internal bool Contains(byte value)
{
return this.value != null && Array.IndexOf(this.value, value) >= 0;
}
internal static bool AssertStarts(byte[] value, byte[] expected)
{
for (int i = 0; i < expected.Length; i++)
{
if (expected[i] != value[i]) return false;
}
return true;
}
internal static readonly RedisChannel[] EmptyArray = new RedisChannel[0];
private readonly byte[] value;
private RedisChannel(byte[] value)
{
this.value = value;
}
internal bool IsNull
{
get { return value == null; }
......@@ -178,12 +160,31 @@ public override string ToString()
return ((string)this) ?? "(null)";
}
internal static bool AssertStarts(byte[] value, byte[] expected)
{
for (int i = 0; i < expected.Length; i++)
{
if (expected[i] != value[i]) return false;
}
return true;
}
internal RedisChannel Assert()
{
if (IsNull) throw new ArgumentException("A null key is not valid in this context");
return this;
}
internal RedisChannel Clone()
{
byte[] clone = value == null ? null : (byte[])value.Clone();
return clone;
}
internal bool Contains(byte value)
{
return this.value != null && Array.IndexOf(this.value, value) >= 0;
}
/// <summary>
/// Create a channel name from a String
/// </summary>
......
......@@ -146,6 +146,11 @@ public override string ToString()
return ((string)this) ?? "(null)";
}
internal RedisValue AsRedisValue()
{
return value;
}
internal RedisKey Assert()
{
if (IsNull) throw new ArgumentException("A null key is not valid in this context");
......@@ -191,10 +196,5 @@ internal RedisKey Assert()
return BitConverter.ToString(arr);
}
}
internal RedisValue AsRedisValue()
{
return value;
}
}
}
......@@ -64,10 +64,8 @@ public static readonly RedisValue
Wildcard = "*";
public static readonly byte[] BytesOK = Encoding.UTF8.GetBytes("OK");
public static readonly byte[] ByteWildcard = { (byte)'*' };
public static readonly byte[] BytesPONG = Encoding.UTF8.GetBytes("PONG");
public static readonly byte[] ByteWildcard = { (byte)'*' };
internal static RedisValue Get(Bitwise operation)
{
switch(operation)
......
......@@ -159,12 +159,11 @@ internal static RedisResult TryCreate(PhysicalConnection connection, RawResult r
internal abstract RedisKey[] AsRedisKeyArray();
internal abstract RedisResult[] AsRedisResultArray();
internal abstract RedisValue AsRedisValue();
internal abstract RedisValue[] AsRedisValueArray();
internal abstract RedisResult[] AsRedisResultArray();
internal abstract string AsString();
internal abstract string[] AsStringArray();
private sealed class ArrayRedisResult : RedisResult
......@@ -250,6 +249,8 @@ internal override RedisKey AsRedisKey()
internal override RedisKey[] AsRedisKeyArray() { return Array.ConvertAll(value, x => x.AsRedisKey()); }
internal override RedisResult[] AsRedisResultArray() { return value; }
internal override RedisValue AsRedisValue()
{
if (value.Length == 1) return value[0].AsRedisValue();
......@@ -264,8 +265,6 @@ internal override string AsString()
throw new InvalidCastException();
}
internal override string[] AsStringArray() { return Array.ConvertAll(value, x => x.AsString()); }
internal override RedisResult[] AsRedisResultArray() { return value; }
}
private sealed class ErrorRedisResult : RedisResult
......@@ -309,13 +308,14 @@ public ErrorRedisResult(string value)
internal override RedisKey[] AsRedisKeyArray() { throw new RedisServerException(value); }
internal override RedisResult[] AsRedisResultArray() { throw new RedisServerException(value); }
internal override RedisValue AsRedisValue() { throw new RedisServerException(value); }
internal override RedisValue[] AsRedisValueArray() { throw new RedisServerException(value); }
internal override string AsString() { throw new RedisServerException(value); }
internal override string[] AsStringArray() { throw new RedisServerException(value); }
internal override RedisResult[] AsRedisResultArray() { throw new RedisServerException(value); }
}
private sealed class SingleRedisResult : RedisResult
......@@ -358,13 +358,14 @@ public SingleRedisResult(RedisValue value)
internal override RedisKey[] AsRedisKeyArray() { return new[] { AsRedisKey() }; }
internal override RedisResult[] AsRedisResultArray() { throw new InvalidCastException(); }
internal override RedisValue AsRedisValue() { return value; }
internal override RedisValue[] AsRedisValueArray() { return new[] { AsRedisValue() }; }
internal override string AsString() { return (string)value; }
internal override string[] AsStringArray() { return new[] { AsString() }; }
internal override RedisResult[] AsRedisResultArray() { throw new InvalidCastException(); }
}
}
}
......@@ -172,13 +172,6 @@ public TransactionMessage(int db, CommandFlags flags, List<ConditionResult> cond
this.conditions = (conditions == null || conditions.Count == 0) ? NixConditions : conditions.ToArray();
}
public override void AppendStormLog(StringBuilder sb)
{
base.AppendStormLog(sb);
if (conditions.Length != 0) sb.Append(", ").Append(conditions.Length).Append(" conditions");
sb.Append(", ").Append(operations.Length).Append(" operations");
}
public QueuedMessage[] InnerOperations { get { return operations; } }
public bool IsAborted
......@@ -186,6 +179,12 @@ public bool IsAborted
get { return command != RedisCommand.EXEC; }
}
public override void AppendStormLog(StringBuilder sb)
{
base.AppendStormLog(sb);
if (conditions.Length != 0) sb.Append(", ").Append(conditions.Length).Append(" conditions");
sb.Append(", ").Append(operations.Length).Append(" operations");
}
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{
int slot = ServerSelectionStrategy.NoSlot;
......
using System;
using System.Net;
using System.Net;
using System.Text;
namespace StackExchange.Redis
......@@ -27,17 +26,15 @@ internal ServerCounters(EndPoint endpoint)
/// </summary>
public ConnectionCounters Interactive { get; private set; }
/// <summary>
/// Counters associated with the subscription (pub-sub) connection
/// </summary>
public ConnectionCounters Subscription { get; private set; }
/// <summary>
/// Counters associated with other ambient activity
/// </summary>
public ConnectionCounters Other { get; private set; }
/// <summary>
/// Counters associated with the subscription (pub-sub) connection
/// </summary>
public ConnectionCounters Subscription { get; private set; }
/// <summary>
/// Indicates the total number of outstanding items against this server
/// </summary>
......
......@@ -6,7 +6,6 @@
using System.Runtime.CompilerServices;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
namespace StackExchange.Redis
......@@ -30,6 +29,7 @@ internal sealed partial class ServerEndPoint : IDisposable
private readonly EndPoint endpoint;
private readonly Hashtable knownScripts = new Hashtable(StringComparer.Ordinal);
private readonly ConnectionMultiplexer multiplexer;
private int databases, writeEverySeconds;
......@@ -86,8 +86,6 @@ public bool IsConnected
public bool IsSlave { get { return isSlave; } set { SetConfig(ref isSlave, value); } }
public int WriteEverySeconds { get { return writeEverySeconds; } set { SetConfig(ref writeEverySeconds, value); } }
public long OperationCount
{
get
......@@ -109,6 +107,7 @@ public long OperationCount
public Version Version { get { return version; } set { SetConfig(ref version, value); } }
public int WriteEverySeconds { get { return writeEverySeconds; } set { SetConfig(ref writeEverySeconds, value); } }
internal ConnectionMultiplexer Multiplexer { get { return multiplexer; } }
public void ClearUnselectable(UnselectableFlags flags)
......@@ -160,6 +159,11 @@ public PhysicalBridge GetBridge(RedisCommand command, bool create = true)
}
}
public RedisFeatures GetFeatures()
{
return new RedisFeatures(version);
}
public void SetClusterConfiguration(ClusterConfiguration configuration)
{
......@@ -219,6 +223,14 @@ internal void Activate(ConnectionType type)
GetBridge(type, true);
}
internal void AddScript(string script, byte[] hash)
{
lock (knownScripts)
{
knownScripts[script] = hash;
}
}
internal void AutoConfigure(PhysicalConnection connection)
{
if (serverType == ServerType.Twemproxy)
......@@ -299,6 +311,14 @@ internal Task Close()
return result;
}
internal void FlushScripts()
{
lock (knownScripts)
{
knownScripts.Clear();
}
}
internal ServerCounters GetCounters()
{
var counters = new ServerCounters(endpoint);
......@@ -309,6 +329,75 @@ internal ServerCounters GetCounters()
return counters;
}
internal int GetOutstandingCount(RedisCommand command, out int inst, out int qu, out int qs, out int qc, out int wr, out int wq)
{
var bridge = GetBridge(command, false);
if (bridge == null)
{
return inst = qu = qs = qc = wr = wq = 0;
}
return bridge.GetOutstandingCount(out inst, out qu, out qs, out qc, out wr, out wq);
}
internal string GetProfile()
{
var sb = new StringBuilder();
sb.Append("Circular op-count snapshot; int:");
var tmp = interactive;
if (tmp != null) tmp.AppendProfile(sb);
sb.Append("; sub:");
tmp = subscription;
if (tmp != null) tmp.AppendProfile(sb);
return sb.ToString();
}
internal byte[] GetScriptHash(string script)
{
return (byte[])knownScripts[script];
}
internal string GetStormLog(RedisCommand command)
{
var bridge = GetBridge(command);
return bridge == null ? null : bridge.GetStormLog();
}
internal Message GetTracerMessage(bool assertIdentity)
{
// different configurations block certain commands, as can ad-hoc local configurations, so
// we'll do the best with what we have available.
// note that the muxer-ctor asserts that one of ECHO, PING, TIME of GET is available
// see also: TracerProcessor
var map = multiplexer.CommandMap;
Message msg;
const CommandFlags flags = CommandFlags.NoRedirect | CommandFlags.FireAndForget;
if (assertIdentity && map.IsAvailable(RedisCommand.ECHO))
{
msg = Message.Create(-1, flags, RedisCommand.ECHO, (RedisValue)multiplexer.UniqueId);
}
else if (map.IsAvailable(RedisCommand.PING))
{
msg = Message.Create(-1, flags, RedisCommand.PING);
}
else if (map.IsAvailable(RedisCommand.TIME))
{
msg = Message.Create(-1, flags, RedisCommand.TIME);
}
else if (!assertIdentity && map.IsAvailable(RedisCommand.ECHO))
{
// we'll use echo as a PING substitute if it is all we have (in preference to EXISTS)
msg = Message.Create(-1, flags, RedisCommand.ECHO, (RedisValue)multiplexer.UniqueId);
}
else
{
map.AssertAvailable(RedisCommand.EXISTS);
msg = Message.Create(0, flags, RedisCommand.EXISTS, (RedisValue)multiplexer.UniqueId);
}
msg.SetInternalCall();
return msg;
}
internal bool IsSelectable(RedisCommand command)
{
var bridge = unselectableReasons == 0 ? GetBridge(command, false) : null;
......@@ -391,6 +480,11 @@ internal void ReportNextFailure()
if (tmp != null) tmp.ReportNextFailure();
}
internal Task<bool> SendTracer()
{
return QueueDirectAsync(GetTracerMessage(false), ResultProcessor.Tracer);
}
internal string Summary()
{
var sb = new StringBuilder(Format.ToString(endpoint))
......@@ -423,11 +517,6 @@ internal string Summary()
}
return sb.ToString();
}
public RedisFeatures GetFeatures()
{
return new RedisFeatures(version);
}
internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection, Message message, ResultProcessor<T> processor)
{
if (message != null)
......@@ -521,94 +610,5 @@ private void SetConfig<T>(ref T field, T value, [CallerMemberName] string caller
multiplexer.ReconfigureIfNeeded(endpoint, false, caller);
}
}
internal Task<bool> SendTracer()
{
return QueueDirectAsync(GetTracerMessage(false), ResultProcessor.Tracer);
}
internal Message GetTracerMessage(bool assertIdentity)
{
// different configurations block certain commands, as can ad-hoc local configurations, so
// we'll do the best with what we have available.
// note that the muxer-ctor asserts that one of ECHO, PING, TIME of GET is available
// see also: TracerProcessor
var map = multiplexer.CommandMap;
Message msg;
const CommandFlags flags = CommandFlags.NoRedirect | CommandFlags.FireAndForget;
if (assertIdentity && map.IsAvailable(RedisCommand.ECHO))
{
msg = Message.Create(-1, flags, RedisCommand.ECHO, (RedisValue)multiplexer.UniqueId);
}
else if (map.IsAvailable(RedisCommand.PING))
{
msg = Message.Create(-1, flags, RedisCommand.PING);
}
else if (map.IsAvailable(RedisCommand.TIME))
{
msg = Message.Create(-1, flags, RedisCommand.TIME);
}
else if(!assertIdentity && map.IsAvailable(RedisCommand.ECHO))
{
// we'll use echo as a PING substitute if it is all we have (in preference to EXISTS)
msg = Message.Create(-1, flags, RedisCommand.ECHO, (RedisValue)multiplexer.UniqueId);
}
else
{
map.AssertAvailable(RedisCommand.EXISTS);
msg = Message.Create(0, flags, RedisCommand.EXISTS, (RedisValue)multiplexer.UniqueId);
}
msg.SetInternalCall();
return msg;
}
internal int GetOutstandingCount(RedisCommand command, out int inst, out int qu, out int qs, out int qc, out int wr, out int wq)
{
var bridge = GetBridge(command, false);
if(bridge == null)
{
return inst = qu = qs = qc = wr = wq = 0;
}
return bridge.GetOutstandingCount(out inst, out qu, out qs, out qc, out wr, out wq);
}
internal string GetStormLog(RedisCommand command)
{
var bridge = GetBridge(command);
return bridge == null ? null : bridge.GetStormLog();
}
internal string GetProfile()
{
var sb = new StringBuilder();
sb.Append("Circular op-count snapshot; int:");
var tmp = interactive;
if (tmp != null) tmp.AppendProfile(sb);
sb.Append("; sub:");
tmp = subscription;
if (tmp != null) tmp.AppendProfile(sb);
return sb.ToString();
}
private readonly Hashtable knownScripts = new Hashtable(StringComparer.Ordinal);
internal byte[] GetScriptHash(string script)
{
return (byte[])knownScripts[script];
}
internal void AddScript(string script, byte[] hash)
{
lock(knownScripts)
{
knownScripts[script] = hash;
}
}
internal void FlushScripts()
{
lock(knownScripts)
{
knownScripts.Clear();
}
}
}
}
......@@ -6,6 +6,7 @@ namespace StackExchange.Redis
{
internal sealed class ServerSelectionStrategy
{
public const int NoSlot = -1, MultipleSlots = -2;
private const int RedisClusterSlotCount = 16384;
static readonly ushort[] crc16tab =
{
......@@ -57,26 +58,6 @@ public ServerSelectionStrategy(ConnectionMultiplexer multiplexer)
public ServerType ServerType { get { return serverType; } set { serverType = value; } }
internal int TotalSlots { get { return RedisClusterSlotCount; } }
public const int NoSlot = -1, MultipleSlots = -2;
internal int CombineSlot(int oldSlot, int newSlot)
{
if (oldSlot == MultipleSlots || newSlot == NoSlot) return oldSlot;
if (oldSlot == NoSlot) return newSlot;
return oldSlot == newSlot ? oldSlot : MultipleSlots;
}
internal int CombineSlot(int oldSlot, RedisKey key)
{
byte[] blob = key.Value;
if (oldSlot == MultipleSlots || (blob = key.Value) == null) return oldSlot;
int newSlot = HashSlot(blob);
if (oldSlot == NoSlot) return newSlot;
return oldSlot == newSlot ? oldSlot : MultipleSlots;
}
/// <summary>
/// Computes the hash-slot that would be used by the given key
/// </summary>
......@@ -104,6 +85,7 @@ public unsafe int HashSlot(byte[] key)
}
}
}
public ServerEndPoint Select(Message message)
{
if (message == null) throw new ArgumentNullException("message");
......@@ -121,44 +103,12 @@ public ServerEndPoint Select(Message message)
}
return Select(slot, message.Command, message.Flags);
}
public ServerEndPoint Select(int db, RedisCommand command, RedisKey key, CommandFlags flags)
{
int slot = serverType == ServerType.Cluster ? HashSlot(key) : NoSlot;
return Select(slot, command, flags);
}
private ServerEndPoint Select(int slot, RedisCommand command, CommandFlags flags)
{
flags = Message.GetMasterSlaveFlags(flags); // only intersted in master/slave preferences
ServerEndPoint[] arr;
if (slot == NoSlot || (arr = map) == null) return Any(command, flags);
ServerEndPoint endpoint = arr[slot], testing;
// but: ^^^ is the MASTER slots; if we want a slave, we need to do some thinking
if (endpoint != null)
{
switch (flags)
{
case CommandFlags.DemandSlave:
return FindSlave(endpoint, command) ?? Any(command, flags);
case CommandFlags.PreferSlave:
testing = FindSlave(endpoint, command);
if (testing != null) return testing;
break;
case CommandFlags.DemandMaster:
return FindMaster(endpoint, command) ?? Any(command, flags);
case CommandFlags.PreferMaster:
testing = FindMaster(endpoint, command);
if (testing != null) return testing;
break;
}
if (endpoint.IsSelectable(command)) return endpoint;
}
return Any(command, flags);
}
public bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isMoved)
{
......@@ -225,6 +175,21 @@ public bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isM
}
}
internal int CombineSlot(int oldSlot, int newSlot)
{
if (oldSlot == MultipleSlots || newSlot == NoSlot) return oldSlot;
if (oldSlot == NoSlot) return newSlot;
return oldSlot == newSlot ? oldSlot : MultipleSlots;
}
internal int CombineSlot(int oldSlot, RedisKey key)
{
byte[] blob = key.Value;
if (oldSlot == MultipleSlots || (blob = key.Value) == null) return oldSlot;
int newSlot = HashSlot(blob);
if (oldSlot == NoSlot) return newSlot;
return oldSlot == newSlot ? oldSlot : MultipleSlots;
}
internal int CountCoveredSlots()
{
var arr = map;
......@@ -284,7 +249,7 @@ private ServerEndPoint FindSlave(ServerEndPoint endpoint, RedisCommand command)
private ServerEndPoint[] MapForMutation()
{
var arr = map;
if(arr == null)
if (arr == null)
{
lock (this)
{
......@@ -294,5 +259,37 @@ private ServerEndPoint[] MapForMutation()
}
return arr;
}
private ServerEndPoint Select(int slot, RedisCommand command, CommandFlags flags)
{
flags = Message.GetMasterSlaveFlags(flags); // only intersted in master/slave preferences
ServerEndPoint[] arr;
if (slot == NoSlot || (arr = map) == null) return Any(command, flags);
ServerEndPoint endpoint = arr[slot], testing;
// but: ^^^ is the MASTER slots; if we want a slave, we need to do some thinking
if (endpoint != null)
{
switch (flags)
{
case CommandFlags.DemandSlave:
return FindSlave(endpoint, command) ?? Any(command, flags);
case CommandFlags.PreferSlave:
testing = FindSlave(endpoint, command);
if (testing != null) return testing;
break;
case CommandFlags.DemandMaster:
return FindMaster(endpoint, command) ?? Any(command, flags);
case CommandFlags.PreferMaster:
testing = FindMaster(endpoint, command);
if (testing != null) return testing;
break;
}
if (endpoint.IsSelectable(command)) return endpoint;
}
return Any(command, flags);
}
}
}
#if MONO
namespace StackExchange.Redis
{
partial class SocketManager
{
internal const SocketMode DefaultSocketMode = SocketMode.Async;
partial void OnAddRead(System.Net.Sockets.Socket socket, ISocketCallback callback)
{
throw new System.NotSupportedException();
}
}
}
#endif
\ No newline at end of file
This diff is collapsed.
......@@ -81,6 +81,14 @@ static TaskSource()
IsSyncSafe = t => false; // assume: not
}
/// <summary>
/// Create a new TaskCompletion source
/// </summary>
public static TaskCompletionSource<T> Create<T>(object asyncState)
{
return new TaskCompletionSource<T>(asyncState);
}
/// <summary>
/// Create a new TaskCompletionSource that will not allow result-setting threads to be hijacked
/// </summary>
......@@ -90,12 +98,5 @@ public static TaskCompletionSource<T> CreateDenyExecSync<T>(object asyncState)
DenyExecSync(source.Task);
return source;
}
/// <summary>
/// Create a new TaskCompletion source
/// </summary>
public static TaskCompletionSource<T> Create<T>(object asyncState)
{
return new TaskCompletionSource<T>(asyncState);
}
}
}
@rd /s /q StackExchange.Redis\bin\mono
@rd /s /q BasicTest\bin\mono
@md StackExchange.Redis\bin\mono
@md BasicTest\bin\mono
@rd /s /q StackExchange.Redis\bin\mono 1>nul 2>nul
@rd /s /q BasicTest\bin\mono 1>nul 2>nul
@md StackExchange.Redis\bin\mono 1>nul 2>nul
@md BasicTest\bin\mono 1>nul 2>nul
@echo Building StackExchange.Redis.dll ...
@call mcs -recurse:StackExchange.Redis\*.cs -out:StackExchange.Redis\bin\mono\StackExchange.Redis.dll -target:library -unsafe+ -o+ -r:System.IO.Compression.dll -d:MONO
@echo Building BasicTest.exe ...
......
@rd /s /q StackExchange.Redis\bin\Release 1>nul 2>nul
@rd /s /q BasicTest\bin\Release 1>nul 2>nul
@md StackExchange.Redis\bin\Release 1>nul 2>nul
@md BasicTest\bin\Release 1>nul 2>nul
@echo Building StackExchange.Redis.dll ...
@call csc /out:StackExchange.Redis\bin\Release\StackExchange.Redis.dll /target:library /unsafe+ /o+ /r:System.IO.Compression.dll /recurse:StackExchange.Redis\*.cs
@echo Building BasicTest.exe ...
@call csc /out:BasicTest\bin\Release\BasicTest.exe /target:exe -o+ /r:StackExchange.Redis\bin\Release\StackExchange.Redis.dll BasicTest\Program.cs
@copy StackExchange.Redis\bin\Release\*.* BasicTest\bin\Release > nul
@echo .
@echo Running basic test (.NET) ...
@call BasicTest\bin\Release\BasicTest.exe 100000
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment