Commit b91987d3 authored by Marc Gravell's avatar Marc Gravell

investigate ConnectToSSLServer using azure as a source; some very interesting discoveries:

key bugs resolved:

- make sure that we only push things into _writtenAwaitingResponse until we have actually written them; they are allowed to throw certain errors *before* write, which breaks the queue completely
- ensire that MOVED redirects are sents as "internal call"; this means that if this is a new server, it can be queued rather than dropped
- allow MOVED code to select a server that isn't "up" yet

additional tweaks:

- change accessibility on WriteImpl to ensure everything goes via WriteTo
- renamed some methods to clarify what the lock status is: ...InsideWriteLock, ...TakingWriteLock, etc
- fix bug where ExportConfiguration can fail if a result is nil (although this might be a false outcome)
- why did StringGetWithExpiryMessage specify NoRedirect?
- clarify messages for ProtocolFailure and MOVED
parent c9ae36ce
......@@ -3,6 +3,7 @@
using System.Diagnostics;
using System.IO;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
......@@ -44,29 +45,41 @@ public void ConnectToAzure(int? port, bool ssl)
[InlineData(false, false)]
[InlineData(true, false)]
[InlineData(true, true)]
public void ConnectToSSLServer(bool useSsl, bool specifyHost)
public async Task ConnectToSSLServer(bool useSsl, bool specifyHost)
{
Skip.IfNoConfig(nameof(TestConfig.Config.SslServer), TestConfig.Current.SslServer);
var server = TestConfig.Current.SslServer;
int? port = TestConfig.Current.SslPort;
string password = "";
bool isAzure = false;
if (string.IsNullOrWhiteSpace(server) && useSsl)
{
// we can bounce it past azure instead?
server = TestConfig.Current.AzureCacheServer;
password = TestConfig.Current.AzureCachePassword;
port = null;
isAzure = true;
}
Skip.IfNoConfig(nameof(TestConfig.Config.SslServer), server);
var config = new ConfigurationOptions
{
CommandMap = CommandMap.Create( // looks like "config" is disabled
new Dictionary<string, string>
{
["config"] = null,
["cluster"] = null
}
),
EndPoints = { { TestConfig.Current.SslServer, TestConfig.Current.SslPort } },
AllowAdmin = true,
SyncTimeout = Debugger.IsAttached ? int.MaxValue : 5000
SyncTimeout = Debugger.IsAttached ? int.MaxValue : 5000,
Password = password,
};
var map = new Dictionary<string, string>();
map["config"] = null; // don't rely on config working
if (!isAzure) map["cluster"] = null;
config.CommandMap = CommandMap.Create(map);
if (port != null) config.EndPoints.Add(server, port.Value);
else config.EndPoints.Add(server);
if (useSsl)
{
config.Ssl = useSsl;
if (specifyHost)
{
config.SslHost = TestConfig.Current.SslServer;
config.SslHost = server;
}
config.CertificateValidation += (sender, cert, chain, errors) =>
{
......@@ -90,7 +103,7 @@ public void ConnectToSSLServer(bool useSsl, bool specifyHost)
muxer.ConnectionFailed += OnConnectionFailed;
muxer.InternalError += OnInternalError;
var db = muxer.GetDatabase();
db.Ping();
await db.PingAsync();
using (var file = File.Create("ssl-" + useSsl + "-" + specifyHost + ".zip"))
{
muxer.ExportConfiguration(file);
......@@ -99,14 +112,22 @@ public void ConnectToSSLServer(bool useSsl, bool specifyHost)
const int AsyncLoop = 2000;
// perf; async
db.KeyDelete(key, CommandFlags.FireAndForget);
await db.KeyDeleteAsync(key);
var watch = Stopwatch.StartNew();
for (int i = 0; i < AsyncLoop; i++)
{
db.StringIncrement(key, flags: CommandFlags.FireAndForget);
try
{
await db.StringIncrementAsync(key, flags: CommandFlags.FireAndForget);
}
catch (Exception ex)
{
Output.WriteLine($"Failure on i={i}: {ex.Message}");
throw;
}
}
// need to do this inside the timer to measure the TTLB
long value = (long)db.StringGet(key);
long value = (long)await db.StringGetAsync(key);
watch.Stop();
Assert.Equal(AsyncLoop, value);
Output.WriteLine("F&F: {0} INCR, {1:###,##0}ms, {2} ops/s; final value: {3}",
......@@ -116,7 +137,7 @@ public void ConnectToSSLServer(bool useSsl, bool specifyHost)
value);
// perf: sync/multi-threaded
TestConcurrent(db, key, 30, 10);
// TestConcurrent(db, key, 30, 10);
//TestConcurrent(db, key, 30, 20);
//TestConcurrent(db, key, 30, 30);
//TestConcurrent(db, key, 30, 40);
......
......@@ -297,7 +297,7 @@ public ConditionMessage(Condition condition, int db, CommandFlags flags, RedisCo
this.value = value; // note no assert here
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
if (value.IsNull)
{
......
......@@ -280,9 +280,16 @@ public void ExportConfiguration(Stream destination, ExportOptions options = Expo
{
Write<ClientInfo[]>(zip, prefix + "/clients.txt", tasks[index++], (clients, writer) =>
{
foreach (var client in clients)
if (clients == null)
{
writer.WriteLine(client.Raw);
writer.WriteLine(NoContent);
}
else
{
foreach (var client in clients)
{
writer.WriteLine(client.Raw);
}
}
});
}
......@@ -445,14 +452,21 @@ internal void CheckMessage(Message message)
throw ExceptionFactory.AdminModeNotEnabled(IncludeDetailInExceptions, message.Command, message, null);
CommandMap.AssertAvailable(message.Command);
}
const string NoContent = "(no content)";
private static void WriteNormalizingLineEndings(string source, StreamWriter writer)
{
using (var reader = new StringReader(source))
if (source == null)
{
string line;
while ((line = reader.ReadLine()) != null)
writer.WriteLine(line); // normalize line endings
writer.WriteLine(NoContent);
}
else
{
using (var reader = new StringReader(source))
{
string line;
while ((line = reader.ReadLine()) != null)
writer.WriteLine(line); // normalize line endings
}
}
}
......
......@@ -313,7 +313,7 @@ partial class PhysicalConnection
{
name = name.Replace(c, '_');
}
pipe = new LoggingPipe(pipe, $"{name}.in", $"{name}.out", mgr);
pipe = new LoggingPipe(pipe, $"{name}.in.resp", $"{name}.out.resp", mgr);
}
}
#endif
......
......@@ -31,14 +31,14 @@ private LoggingMessage(TextWriter log, Message tail) : base(tail.Db, tail.Flags,
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) => tail.GetHashSlot(serverSelectionStrategy);
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
try
{
physical.Multiplexer.LogLocked(log, "Writing to {0}: {1}", physical.Bridge, tail.CommandAndKey);
}
catch { }
tail.WriteImpl(physical);
tail.WriteTo(physical);
}
public TextWriter Log => log;
......@@ -204,7 +204,6 @@ internal void SetScriptUnavailable()
public bool IsInternalCall => (flags & InternalCallFlag) != 0;
public ResultBox ResultBox => resultBox;
public static Message Create(int db, CommandFlags flags, RedisCommand command)
{
if (command == RedisCommand.SELECT)
......@@ -648,7 +647,7 @@ internal void SetSource<T>(ResultBox<T> resultBox, ResultProcessor<T> resultProc
this.resultProcessor = resultProcessor;
}
internal abstract void WriteImpl(PhysicalConnection physical);
protected abstract void WriteImpl(PhysicalConnection physical);
internal void WriteTo(PhysicalConnection physical)
{
......@@ -702,7 +701,7 @@ private sealed class CommandChannelMessage : CommandChannelBase
{
public CommandChannelMessage(int db, CommandFlags flags, RedisCommand command, RedisChannel channel) : base(db, flags, command, channel)
{ }
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 1);
physical.Write(Channel);
......@@ -718,7 +717,7 @@ public CommandChannelValueMessage(int db, CommandFlags flags, RedisCommand comma
this.value = value;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(Channel);
......@@ -744,7 +743,7 @@ public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
return serverSelectionStrategy.CombineSlot(slot, key2);
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 3);
physical.Write(Key);
......@@ -768,7 +767,7 @@ public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
return serverSelectionStrategy.CombineSlot(slot, key1);
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(Key);
......@@ -798,7 +797,7 @@ public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
return slot;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(command, keys.Length + 1);
physical.Write(Key);
......@@ -818,7 +817,7 @@ public CommandKeyKeyValueMessage(int db, CommandFlags flags, RedisCommand comman
this.value = value;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 3);
physical.Write(Key);
......@@ -831,7 +830,7 @@ private sealed class CommandKeyMessage : CommandKeyBase
{
public CommandKeyMessage(int db, CommandFlags flags, RedisCommand command, RedisKey key) : base(db, flags, command, key)
{ }
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 1);
physical.Write(Key);
......@@ -850,7 +849,7 @@ public CommandValuesMessage(int db, CommandFlags flags, RedisCommand command, Re
this.values = values;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(command, values.Length);
for (int i = 0; i < values.Length; i++)
......@@ -882,7 +881,7 @@ public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
return slot;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(command, keys.Length);
for (int i = 0; i < keys.Length; i++)
......@@ -901,7 +900,7 @@ public CommandKeyValueMessage(int db, CommandFlags flags, RedisCommand command,
this.value = value;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(Key);
......@@ -930,7 +929,7 @@ public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
return serverSelectionStrategy.CombineSlot(slot, key1);
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, values.Length + 2);
physical.Write(Key);
......@@ -951,7 +950,7 @@ public CommandKeyValuesMessage(int db, CommandFlags flags, RedisCommand command,
this.values = values;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, values.Length + 1);
physical.Write(Key);
......@@ -970,7 +969,7 @@ public CommandKeyValueValueMessage(int db, CommandFlags flags, RedisCommand comm
this.value1 = value1;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 3);
physical.Write(Key);
......@@ -992,7 +991,7 @@ public CommandKeyValueValueValueMessage(int db, CommandFlags flags, RedisCommand
this.value2 = value2;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 4);
physical.Write(Key);
......@@ -1017,7 +1016,7 @@ public CommandKeyValueValueValueValueMessage(int db, CommandFlags flags, RedisCo
this.value3 = value3;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 5);
physical.Write(Key);
......@@ -1031,7 +1030,7 @@ internal override void WriteImpl(PhysicalConnection physical)
private sealed class CommandMessage : Message
{
public CommandMessage(int db, CommandFlags flags, RedisCommand command) : base(db, flags, command) { }
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 0);
}
......@@ -1058,7 +1057,7 @@ public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
return slot;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(command, values.Length);
for (int i = 0; i < values.Length; i++)
......@@ -1077,7 +1076,7 @@ public CommandValueChannelMessage(int db, CommandFlags flags, RedisCommand comma
this.value = value;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(value);
......@@ -1101,7 +1100,7 @@ public override void AppendStormLog(StringBuilder sb)
sb.Append(" (").Append((string)value).Append(')');
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(value);
......@@ -1118,7 +1117,7 @@ public CommandValueMessage(int db, CommandFlags flags, RedisCommand command, Red
this.value = value;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 1);
physical.Write(value);
......@@ -1136,7 +1135,7 @@ public CommandValueValueMessage(int db, CommandFlags flags, RedisCommand command
this.value1 = value1;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(value0);
......@@ -1157,7 +1156,7 @@ public CommandValueValueValueMessage(int db, CommandFlags flags, RedisCommand co
this.value2 = value2;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 3);
physical.Write(value0);
......@@ -1183,7 +1182,7 @@ public CommandValueValueValueValueValueMessage(int db, CommandFlags flags, Redis
this.value4 = value4;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 5);
physical.Write(value0);
......@@ -1200,7 +1199,7 @@ public SelectMessage(int db, CommandFlags flags) : base(db, flags, RedisCommand.
{
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 1);
physical.Write(Db);
......
......@@ -136,7 +136,7 @@ public WriteResult TryWrite(Message message, bool isSlave)
var physical = this.physical;
if (physical == null) return WriteResult.NoConnectionAvailable;
var result = WriteMessageDirect(physical, message);
var result = WriteMessageTakingWriteLock(physical, message);
LogNonPreferred(message.Flags, isSlave);
return result;
......@@ -322,7 +322,7 @@ private void WritePendingBacklog(PhysicalConnection connection)
do
{
next = DequeueNextPendingBacklog();
if (next != null) WriteMessageDirect(connection, next);
if (next != null) WriteMessageTakingWriteLock(connection, next);
} while (next != null);
}
}
......@@ -492,7 +492,7 @@ internal bool TryEnqueue(List<Message> messages, bool isSlave)
{ // deliberately not taking a single lock here; we don't care if
// other threads manage to interleave - in fact, it would be desirable
// (to avoid a batch monopolising the connection)
WriteMessageDirect(physical, message);
WriteMessageTakingWriteLock(physical, message);
LogNonPreferred(message.Flags, isSlave);
}
return true;
......@@ -505,7 +505,7 @@ internal bool TryEnqueue(List<Message> messages, bool isSlave)
/// </summary>
/// <param name="physical">The phsyical connection to write to.</param>
/// <param name="next">The message to be written.</param>
internal WriteResult WriteMessageDirect(PhysicalConnection physical, Message next)
internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Message next)
{
Trace("Writing: " + next);
next.SetEnqueued();
......@@ -519,10 +519,10 @@ internal WriteResult WriteMessageDirect(PhysicalConnection physical, Message nex
var messageIsSent = false;
if (next is IMultiMessage)
{
SelectDatabase(physical, next); // need to switch database *before* the transaction
SelectDatabaseInsideWriteLock(physical, next); // need to switch database *before* the transaction
foreach (var subCommand in ((IMultiMessage)next).GetMessages(physical))
{
result = WriteMessageToServer(physical, subCommand);
result = WriteMessageToServerInsideWriteLock(physical, subCommand);
if (result != WriteResult.Success)
{
// we screwed up; abort; note that WriteMessageToServer already
......@@ -545,7 +545,7 @@ internal WriteResult WriteMessageDirect(PhysicalConnection physical, Message nex
}
else
{
result = WriteMessageToServer(physical, next);
result = WriteMessageToServerInsideWriteLock(physical, next);
}
physical.WakeWriterAndCheckForThrottle();
}
......@@ -637,7 +637,7 @@ private void OnInternalError(Exception exception, [CallerMemberName] string orig
Multiplexer.OnInternalError(exception, ServerEndPoint.EndPoint, ConnectionType, origin);
}
private void SelectDatabase(PhysicalConnection connection, Message message)
private void SelectDatabaseInsideWriteLock(PhysicalConnection connection, Message message)
{
int db = message.Db;
if (db >= 0)
......@@ -645,15 +645,15 @@ private void SelectDatabase(PhysicalConnection connection, Message message)
var sel = connection.GetSelectDatabaseCommand(db, message);
if (sel != null)
{
connection.Enqueue(sel);
sel.WriteImpl(connection);
sel.WriteTo(connection);
connection.EnqueueInsideWriteLock(sel);
sel.SetRequestSent();
IncrementOpCount();
}
}
}
private WriteResult WriteMessageToServer(PhysicalConnection connection, Message message)
private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection connection, Message message)
{
if (message == null) return WriteResult.Success; // for some definition of success
......@@ -666,15 +666,15 @@ private WriteResult WriteMessageToServer(PhysicalConnection connection, Message
throw ExceptionFactory.MasterOnly(Multiplexer.IncludeDetailInExceptions, message.Command, message, ServerEndPoint);
}
SelectDatabase(connection, message);
SelectDatabaseInsideWriteLock(connection, message);
if (!connection.TransactionActive)
{
var readmode = connection.GetReadModeCommand(isMasterOnly);
if (readmode != null)
{
connection.Enqueue(readmode);
readmode.WriteTo(connection);
connection.EnqueueInsideWriteLock(readmode);
readmode.SetRequestSent();
IncrementOpCount();
}
......@@ -682,8 +682,8 @@ private WriteResult WriteMessageToServer(PhysicalConnection connection, Message
if (message.IsAsking)
{
var asking = ReusableAskingCommand;
connection.Enqueue(asking);
asking.WriteImpl(connection);
asking.WriteTo(connection);
connection.EnqueueInsideWriteLock(asking);
asking.SetRequestSent();
IncrementOpCount();
}
......@@ -701,8 +701,8 @@ private WriteResult WriteMessageToServer(PhysicalConnection connection, Message
break;
}
connection.Enqueue(message);
message.WriteImpl(connection);
message.WriteTo(connection);
connection.EnqueueInsideWriteLock(message);
message.SetRequestSent();
IncrementOpCount();
......@@ -728,7 +728,7 @@ private WriteResult WriteMessageToServer(PhysicalConnection connection, Message
catch (RedisCommandException ex)
{
Trace("Write failed: " + ex.Message);
message.Fail(ConnectionFailureType.ProtocolFailure, ex);
message.Fail(ConnectionFailureType.InternalFailure, ex);
CompleteSyncOrAsync(message);
// this failed without actually writing; we're OK with that... unless there's a transaction
......
......@@ -57,7 +57,7 @@ private static readonly Message
private readonly ConnectionType connectionType;
// things sent to this physical, but not yet received
private readonly Queue<Message> outstanding = new Queue<Message>();
private readonly Queue<Message> _writtenAwaitingResponse = new Queue<Message>();
private readonly string physicalName;
......@@ -241,12 +241,12 @@ void add(string lk, string sk, string v)
// cleanup
managerState = SocketManager.ManagerState.RecordConnectionFailed_FailOutstanding;
lock (outstanding)
lock (_writtenAwaitingResponse)
{
Bridge.Trace(outstanding.Count != 0, "Failing outstanding messages: " + outstanding.Count);
while (outstanding.Count != 0)
Bridge.Trace(_writtenAwaitingResponse.Count != 0, "Failing outstanding messages: " + _writtenAwaitingResponse.Count);
while (_writtenAwaitingResponse.Count != 0)
{
var next = outstanding.Dequeue();
var next = _writtenAwaitingResponse.Dequeue();
Bridge.Trace("Failing: " + next);
next.SetException(innerException);
Bridge.CompleteSyncOrAsync(next);
......@@ -275,19 +275,20 @@ internal static void IdentifyFailureType(Exception exception, ref ConnectionFail
}
}
internal void Enqueue(Message next)
internal void EnqueueInsideWriteLock(Message next)
{
lock (outstanding)
lock (_writtenAwaitingResponse)
{
outstanding.Enqueue(next);
_writtenAwaitingResponse.Enqueue(next);
if (_writtenAwaitingResponse.Count == 1) Monitor.Pulse(_writtenAwaitingResponse);
}
}
internal void GetCounters(ConnectionCounters counters)
{
lock (outstanding)
lock (_writtenAwaitingResponse)
{
counters.SentItemsAwaitingResponse = outstanding.Count;
counters.SentItemsAwaitingResponse = _writtenAwaitingResponse.Count;
}
counters.Subscriptions = SubscriptionCount;
}
......@@ -367,20 +368,20 @@ internal static Message GetSelectDatabaseCommand(int targetDatabase)
internal int GetSentAwaitingResponseCount()
{
lock (outstanding)
lock (_writtenAwaitingResponse)
{
return outstanding.Count;
return _writtenAwaitingResponse.Count;
}
}
internal void GetStormLog(StringBuilder sb)
{
lock (outstanding)
lock (_writtenAwaitingResponse)
{
if (outstanding.Count == 0) return;
sb.Append("Sent, awaiting response from server: ").Append(outstanding.Count).AppendLine();
if (_writtenAwaitingResponse.Count == 0) return;
sb.Append("Sent, awaiting response from server: ").Append(_writtenAwaitingResponse.Count).AppendLine();
int total = 0;
foreach (var item in outstanding)
foreach (var item in _writtenAwaitingResponse)
{
if (++total >= 500) break;
item.AppendStormLog(sb);
......@@ -991,10 +992,16 @@ private void MatchResult(RawResult result)
}
Multiplexer.Trace("Matching result...", physicalName);
Message msg;
lock (outstanding)
lock (_writtenAwaitingResponse)
{
Multiplexer.Trace(outstanding.Count == 0, "Nothing to respond to!", physicalName);
msg = outstanding.Dequeue();
if (_writtenAwaitingResponse.Count == 0)
{
// we could be racing with the writer, but this *really* shouldn't
// be even remotely close
Monitor.Wait(_writtenAwaitingResponse, 500);
}
Multiplexer.Trace(_writtenAwaitingResponse.Count == 0, "Nothing to respond to!", physicalName);
msg = _writtenAwaitingResponse.Dequeue();
}
Multiplexer.Trace("Response to: " + msg, physicalName);
......
......@@ -693,7 +693,7 @@ public KeyMigrateCommandMessage(int db, RedisKey key, EndPoint toServer, int toD
this.migrateOptions = migrateOptions;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
bool isCopy = (migrateOptions & MigrateOptions.Copy) != 0;
bool isReplace = (migrateOptions & MigrateOptions.Replace) != 0;
......@@ -3141,7 +3141,7 @@ public ScriptLoadMessage(CommandFlags flags, string script)
Script = script ?? throw new ArgumentNullException(nameof(script));
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2);
physical.Write(RedisLiterals.LOAD);
......@@ -3193,7 +3193,7 @@ public ExecuteMessage(int db, CommandFlags flags, string command, ICollection<ob
this.args = args ?? Array.Empty<object>();
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(_command, args.Count);
foreach (object arg in args)
......@@ -3293,7 +3293,7 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
yield return this;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
if (hexHash != null)
{
......@@ -3351,7 +3351,7 @@ public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
return slot;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 2 + keys.Length + values.Length);
physical.Write(Key);
......@@ -3380,7 +3380,7 @@ private class StringGetWithExpiryMessage : Message.CommandKeyBase, IMultiMessage
private ResultBox<TimeSpan?> box;
public StringGetWithExpiryMessage(int db, CommandFlags flags, RedisCommand ttlCommand, RedisKey key)
: base(db, flags | CommandFlags.NoRedirect /* <== not implemented/tested */, RedisCommand.GET, key)
: base(db, flags, RedisCommand.GET, key)
{
this.ttlCommand = ttlCommand;
}
......@@ -3410,7 +3410,7 @@ public bool UnwrapValue(out TimeSpan? value, out Exception ex)
return false;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(command, 1);
physical.Write(Key);
......
......@@ -146,9 +146,9 @@ public bool WasQueued
set => wasQueued = value;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
Wrapped.WriteImpl(physical);
Wrapped.WriteTo(physical);
Wrapped.SetRequestSent();
}
}
......@@ -344,7 +344,7 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
yield return this; // acts as either an EXEC or an UNWATCH, depending on "aborted"
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
physical.WriteHeader(Command, 0);
}
......
......@@ -141,7 +141,8 @@ public void ConnectionFail(Message message, ConnectionFailureType fail, Exceptio
{
PhysicalConnection.IdentifyFailureType(innerException, ref fail);
string exMessage = fail.ToString() + (message == null ? "" : (" on " + message.Command));
string exMessage = fail.ToString() + (message == null ? "" : (" on " + (
fail == ConnectionFailureType.ProtocolFailure ? message.ToString() : message.CommandAndKey)));
var ex = innerException == null ? new RedisConnectionException(fail, exMessage)
: new RedisConnectionException(fail, exMessage, innerException);
SetException(message, ex);
......@@ -204,12 +205,12 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, Ra
{
if (isMoved && (message.Flags & CommandFlags.NoRedirect) != 0)
{
err = $"Key has MOVED from Endpoint {endpoint} and hashslot {hashSlot} but CommandFlags.NoRedirect was specified - redirect not followed. ";
err = $"Key has MOVED from Endpoint {endpoint} and hashslot {hashSlot} but CommandFlags.NoRedirect was specified - redirect not followed for {message.CommandAndKey}. ";
}
else
{
unableToConnectError = true;
err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. ";
err = $"Endpoint {endpoint} serving hashslot {hashSlot} is not reachable at this point of time. Please check connectTimeout value. If it is low, try increasing it to give the ConnectionMultiplexer a chance to recover from the network disconnect. ";
}
err += ConnectionMultiplexer.GetThreadPoolAndCPUSummary(bridge.Multiplexer.IncludePerformanceCountersInExceptions);
}
......@@ -349,7 +350,7 @@ public TimerMessage(int db, CommandFlags flags, RedisCommand command, RedisValue
this.value = value;
}
internal override void WriteImpl(PhysicalConnection physical)
protected override void WriteImpl(PhysicalConnection physical)
{
if (value.IsNull)
{
......
......@@ -443,10 +443,10 @@ internal Message GetTracerMessage(bool assertIdentity)
return msg;
}
internal bool IsSelectable(RedisCommand command)
internal bool IsSelectable(RedisCommand command, bool allowDisconnected = false)
{
var bridge = unselectableReasons == 0 ? GetBridge(command, false) : null;
return bridge?.IsConnected == true;
return bridge != null && (allowDisconnected || bridge.IsConnected);
}
internal Task OnEstablishingAsync(PhysicalConnection connection, TextWriter log)
......@@ -631,7 +631,7 @@ internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection,
else
{
Multiplexer.Trace("Writing direct: " + message);
connection.Bridge.WriteMessageDirect(connection, message);
connection.Bridge.WriteMessageTakingWriteLock(connection, message);
}
}
}
......
......@@ -124,6 +124,7 @@ public bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isM
{
message.SetAsking(!isMoved);
message.SetNoRedirect(); // once is enough
if (isMoved) message.SetInternalCall();
// note that everything so far is talking about MASTER nodes; we might be
// wanting a SLAVE, so we'll check
......@@ -132,16 +133,16 @@ public bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isM
switch (Message.GetMasterSlaveFlags(message.Flags))
{
case CommandFlags.DemandMaster:
resendVia = server.IsSelectable(command) ? server : null;
resendVia = server.IsSelectable(command, isMoved) ? server : null;
break;
case CommandFlags.PreferMaster:
resendVia = server.IsSelectable(command) ? server : FindSlave(server, command);
resendVia = server.IsSelectable(command, isMoved) ? server : FindSlave(server, command);
break;
case CommandFlags.PreferSlave:
resendVia = FindSlave(server, command) ?? (server.IsSelectable(command) ? server : null);
resendVia = FindSlave(server, command, isMoved) ?? (server.IsSelectable(command, isMoved) ? server : null);
break;
case CommandFlags.DemandSlave:
resendVia = FindSlave(server, command);
resendVia = FindSlave(server, command, isMoved);
break;
}
if (resendVia == null)
......@@ -235,9 +236,9 @@ private ServerEndPoint FindMaster(ServerEndPoint endpoint, RedisCommand command)
return null;
}
private ServerEndPoint FindSlave(ServerEndPoint endpoint, RedisCommand command)
private ServerEndPoint FindSlave(ServerEndPoint endpoint, RedisCommand command, bool allowDisconnected = false)
{
if (endpoint.IsSlave && endpoint.IsSelectable(command)) return endpoint;
if (endpoint.IsSlave && endpoint.IsSelectable(command, allowDisconnected)) return endpoint;
var slaves = endpoint.Slaves;
var len = slaves.Length;
......@@ -245,7 +246,7 @@ private ServerEndPoint FindSlave(ServerEndPoint endpoint, RedisCommand command)
for (int i = 0; i < len; i++)
{
endpoint = slaves[(int)(((uint)i + startOffset) % len)];
if (endpoint.IsSlave && endpoint.IsSelectable(command)) return endpoint;
if (endpoint.IsSlave && endpoint.IsSelectable(command, allowDisconnected)) return endpoint;
}
return null;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment