Commit f635ba75 authored by Marc Gravell's avatar Marc Gravell

ackowledge that we can timeout while waiting to get on the write queue; up the...

ackowledge that we can timeout while waiting to get on the write queue; up the sync timeout default; rename Queue=>Write - we don't have a queue any more
parent 62f630ca
......@@ -312,7 +312,7 @@ public int ConnectTimeout
/// <summary>
/// Specifies the time in milliseconds that the system should allow for synchronous operations (defaults to 1 second)
/// </summary>
public int SyncTimeout { get { return syncTimeout.GetValueOrDefault(1000); } set { syncTimeout = value; } }
public int SyncTimeout { get { return syncTimeout.GetValueOrDefault(5000); } set { syncTimeout = value; } }
/// <summary>
/// Tie-breaker used to choose between masters (must match the endpoint exactly)
......
......@@ -335,7 +335,7 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
if (!node.IsConnected) continue;
LogLocked(log, "Attempting to set tie-breaker on {0}...", Format.ToString(node.EndPoint));
msg = Message.Create(0, flags, RedisCommand.SET, tieBreakerKey, newMaster);
node.QueueDirectFireAndForget(msg, ResultProcessor.DemandOK);
node.WriteDirectFireAndForget(msg, ResultProcessor.DemandOK);
}
}
......@@ -356,7 +356,7 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
{
LogLocked(log, "Resending tie-breaker to {0}...", Format.ToString(server.EndPoint));
msg = Message.Create(0, flags, RedisCommand.SET, tieBreakerKey, newMaster);
server.QueueDirectFireAndForget(msg, ResultProcessor.DemandOK);
server.WriteDirectFireAndForget(msg, ResultProcessor.DemandOK);
}
// There's an inherent race here in zero-lantency environments (e.g. when Redis is on localhost) when a broadcast is specified
......@@ -376,7 +376,7 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
if (!node.IsConnected) continue;
LogLocked(log, "Broadcasting via {0}...", Format.ToString(node.EndPoint));
msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, newMaster);
node.QueueDirectFireAndForget(msg, ResultProcessor.Int64);
node.WriteDirectFireAndForget(msg, ResultProcessor.Int64);
}
}
......@@ -388,7 +388,7 @@ internal void MakeMaster(ServerEndPoint server, ReplicationChangeOptions options
LogLocked(log, "Enslaving {0}...", Format.ToString(node.EndPoint));
msg = RedisServer.CreateSlaveOfMessage(server.EndPoint, flags);
node.QueueDirectFireAndForget(msg, ResultProcessor.DemandOK);
node.WriteDirectFireAndForget(msg, ResultProcessor.DemandOK);
}
}
......@@ -1322,7 +1322,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
Message msg = Message.Create(0, flags, RedisCommand.GET, tieBreakerKey);
msg.SetInternalCall();
msg = LoggingMessage.Create(log, msg);
tieBreakers[i] = server.QueueDirectAsync(msg, ResultProcessor.String);
tieBreakers[i] = server.WriteDirectAsync(msg, ResultProcessor.String);
}
}
......@@ -1740,7 +1740,7 @@ internal ServerEndPoint SelectServer(int db, RedisCommand command, CommandFlags
return ServerSelectionStrategy.Select(db, command, key, flags);
}
private bool TryPushMessageToBridge<T>(Message message, ResultProcessor<T> processor, ResultBox<T> resultBox, ref ServerEndPoint server)
private WriteResult TryPushMessageToBridge<T>(Message message, ResultProcessor<T> processor, ResultBox<T> resultBox, ref ServerEndPoint server)
{
message.SetSource(processor, resultBox);
......@@ -1791,10 +1791,10 @@ private bool TryPushMessageToBridge<T>(Message message, ResultProcessor<T> proce
}
Trace("Queueing on server: " + message);
if (server.TryEnqueue(message)) return true;
return server.TryWrite(message);
}
Trace("No server or server unavailable - aborting: " + message);
return false;
return WriteResult.NoConnectionAvailable;
}
/// <summary>
......@@ -1947,13 +1947,30 @@ internal Task<T> ExecuteAsyncImpl<T>(Message message, ResultProcessor<T> process
{
var tcs = TaskSource.Create<T>(state);
var source = ResultBox<T>.Get(tcs);
if (!TryPushMessageToBridge(message, processor, source, ref server))
var result = TryPushMessageToBridge(message, processor, source, ref server);
if (result != WriteResult.Success)
{
ThrowFailed(tcs, ExceptionFactory.NoConnectionAvailable(IncludeDetailInExceptions, IncludePerformanceCountersInExceptions, message.Command, message, server, GetServerSnapshot()));
var ex = GetException(result, message, server);
ThrowFailed(tcs, ex);
}
return tcs.Task;
}
}
internal Exception GetException(WriteResult result, Message message, ServerEndPoint server)
{
switch (result)
{
case WriteResult.Success: return null;
case WriteResult.NoConnectionAvailable:
return ExceptionFactory.NoConnectionAvailable(IncludeDetailInExceptions, IncludePerformanceCountersInExceptions, message.Command, message, server, GetServerSnapshot());
case WriteResult.TimeoutBeforeWrite:
return ExceptionFactory.Timeout(IncludeDetailInExceptions, "The timeout was reached before the message could be written to the output buffer, and it was not sent ("
+ Format.ToString(TimeoutMilliseconds)+ "ms)", message, server);
case WriteResult.WriteFailure:
default:
return ExceptionFactory.ConnectionFailure(IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "An unknown error occurred when writing the message", server);
}
}
internal static void ThrowFailed<T>(TaskCompletionSource<T> source, Exception unthrownException)
{
......@@ -1990,9 +2007,10 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
lock (source)
{
if (!TryPushMessageToBridge(message, processor, source, ref server))
var result = TryPushMessageToBridge(message, processor, source, ref server);
if (result != WriteResult.Success)
{
throw ExceptionFactory.NoConnectionAvailable(IncludeDetailInExceptions, IncludePerformanceCountersInExceptions, message.Command, message, server, GetServerSnapshot());
throw GetException(result, message, server);
}
if (Monitor.Wait(source, TimeoutMilliseconds))
......@@ -2011,7 +2029,7 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
}
else
{
var sb = new StringBuilder("Timeout performing ").Append(message.CommandAndKey);
var sb = new StringBuilder("Timeout performing ").Append(message.CommandAndKey).Append(" (").Append(Format.ToString(TimeoutMilliseconds)).Append("ms)");
data = new List<Tuple<string, string>> { Tuple.Create("Message", message.CommandAndKey) };
void add(string lk, string sk, string v)
{
......@@ -2184,4 +2202,11 @@ public Task<long> PublishReconfigureAsync(CommandFlags flags = CommandFlags.None
return GetSubscriber().PublishAsync(channel, RedisLiterals.Wildcard, flags);
}
}
internal enum WriteResult
{
Success,
NoConnectionAvailable,
TimeoutBeforeWrite,
WriteFailure,
}
}
......@@ -9,15 +9,6 @@
namespace StackExchange.Redis
{
internal enum WriteResult
{
QueueEmptyAfterWrite,
NothingToDo,
MoreWork,
CompetingWriter,
NoConnection,
}
internal sealed partial class PhysicalBridge : IDisposable
{
internal readonly string Name;
......@@ -50,14 +41,16 @@ internal sealed partial class PhysicalBridge : IDisposable
private volatile int state = (int)State.Disconnected;
public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type)
public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type, int timeoutMilliseconds)
{
ServerEndPoint = serverEndPoint;
ConnectionType = type;
Multiplexer = serverEndPoint.Multiplexer;
Name = Format.ToString(serverEndPoint.EndPoint) + "/" + ConnectionType.ToString();
completionManager = new CompletionManager(Multiplexer, Name);
TimeoutMilliseconds = timeoutMilliseconds;
}
private readonly int TimeoutMilliseconds;
public enum State : byte
{
......@@ -116,7 +109,7 @@ public void ReportNextFailure()
public void TryConnect(TextWriter log) => GetConnection(log);
public bool TryEnqueue(Message message, bool isSlave)
public WriteResult TryWrite(Message message, bool isSlave)
{
if (isDisposed) throw new ObjectDisposedException(Name);
if (!IsConnected)
......@@ -131,23 +124,23 @@ public bool TryEnqueue(Message message, bool isSlave)
queue.Enqueue(message);
}
message.SetEnqueued();
return true;
return WriteResult.Success; // we'll take it...
}
else
{
// sorry, we're just not ready for you yet;
return false;
return WriteResult.NoConnectionAvailable;
}
}
var physical = this.physical;
if (physical == null) return false;
if (physical == null) return WriteResult.NoConnectionAvailable;
WriteMessageDirect(physical, message);
var result = WriteMessageDirect(physical, message);
LogNonPreferred(message.Flags, isSlave);
return true;
return result;
}
internal void AppendProfile(StringBuilder sb)
......@@ -242,9 +235,12 @@ internal void KeepAlive()
{
msg.SetInternalCall();
Multiplexer.Trace("Enqueue: " + msg);
if (!TryEnqueue(msg, ServerEndPoint.IsSlave))
var result = TryWrite(msg, ServerEndPoint.IsSlave);
if (result != WriteResult.Success)
{
OnInternalError(ExceptionFactory.NoConnectionAvailable(Multiplexer.IncludeDetailInExceptions, Multiplexer.IncludePerformanceCountersInExceptions, msg.Command, msg, ServerEndPoint, Multiplexer.GetServerSnapshot()));
var ex = Multiplexer.GetException(result, msg, ServerEndPoint);
OnInternalError(ex);
}
}
}
......@@ -505,13 +501,16 @@ internal bool TryEnqueue(List<Message> messages, bool isSlave)
/// <summary>
/// This writes a message to the output stream
/// </summary>
internal bool WriteMessageDirect(PhysicalConnection physical, Message next)
internal WriteResult WriteMessageDirect(PhysicalConnection physical, Message next)
{
Trace("Writing: " + next);
next.SetEnqueued();
bool result;
lock (WriteLock)
WriteResult result;
bool haveLock = false;
Monitor.TryEnter(WriteLock, TimeoutMilliseconds, ref haveLock);
if (!haveLock) return WriteResult.TimeoutBeforeWrite;
try
{
var messageIsSent = false;
if (next is IMultiMessage)
......@@ -519,14 +518,15 @@ internal bool WriteMessageDirect(PhysicalConnection physical, Message next)
SelectDatabase(physical, next); // need to switch database *before* the transaction
foreach (var subCommand in ((IMultiMessage)next).GetMessages(physical))
{
if (!WriteMessageToServer(physical, subCommand))
result = WriteMessageToServer(physical, subCommand);
if (result != WriteResult.Success)
{
// we screwed up; abort; note that WriteMessageToServer already
// killed the underlying connection
Trace("Unable to write to server");
next.Fail(ConnectionFailureType.ProtocolFailure, null);
CompleteSyncOrAsync(next);
return false;
return result;
}
//The parent message (next) may be returned from GetMessages
//and should not be marked as sent again below
......@@ -537,7 +537,7 @@ internal bool WriteMessageDirect(PhysicalConnection physical, Message next)
next.SetRequestSent(); // well, it was attempted, at least...
}
result = true;
result = WriteResult.Success;
}
else
{
......@@ -545,6 +545,10 @@ internal bool WriteMessageDirect(PhysicalConnection physical, Message next)
}
physical.WakeWriterAndCheckForThrottle();
}
finally
{
if(haveLock) Monitor.Exit(WriteLock);
}
return result;
}
......@@ -645,9 +649,9 @@ private void SelectDatabase(PhysicalConnection connection, Message message)
}
}
private bool WriteMessageToServer(PhysicalConnection connection, Message message)
private WriteResult WriteMessageToServer(PhysicalConnection connection, Message message)
{
if (message == null) return true;
if (message == null) return WriteResult.Success; // for some definition of success
try
{
......@@ -715,7 +719,7 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message
connection.SetUnknownDatabase();
break;
}
return true;
return WriteResult.Success;
}
catch (RedisCommandException ex)
{
......@@ -728,9 +732,9 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message
{
// we left it in a broken state; need to kill the connection
connection.RecordConnectionFailed(ConnectionFailureType.ProtocolFailure, ex);
return false;
return WriteResult.WriteFailure;
}
return true;
return WriteResult.Success;
}
catch (Exception ex)
{
......@@ -740,7 +744,7 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message
// we're not sure *what* happened here; probably an IOException; kill the connection
connection?.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
return false;
return WriteResult.WriteFailure;
}
}
}
......
......@@ -593,7 +593,7 @@ public void SlaveOf(EndPoint master, CommandFlags flags = CommandFlags.None)
{
var del = Message.Create(0, CommandFlags.FireAndForget | CommandFlags.NoRedirect, RedisCommand.DEL, (RedisKey)configuration.TieBreaker);
del.SetInternalCall();
server.QueueDirectFireAndForget(del, ResultProcessor.Boolean);
server.WriteDirectFireAndForget(del, ResultProcessor.Boolean);
}
ExecuteSync(slaveofMsg, ResultProcessor.DemandOK);
......@@ -603,7 +603,7 @@ public void SlaveOf(EndPoint master, CommandFlags flags = CommandFlags.None)
{
var pub = Message.Create(-1, CommandFlags.FireAndForget | CommandFlags.NoRedirect, RedisCommand.PUBLISH, (RedisValue)channel, RedisLiterals.Wildcard);
pub.SetInternalCall();
server.QueueDirectFireAndForget(pub, ResultProcessor.Int64);
server.WriteDirectFireAndForget(pub, ResultProcessor.Int64);
}
}
......
......@@ -178,7 +178,7 @@ public Task SubscribeToServer(ConnectionMultiplexer multiplexer, RedisChannel ch
var msg = Message.Create(-1, flags, cmd, channel);
return selected.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState);
return selected.WriteDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState);
}
public Task UnsubscribeFromServer(RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall)
......@@ -189,7 +189,7 @@ public Task UnsubscribeFromServer(RedisChannel channel, CommandFlags flags, obje
var cmd = channel.IsPatternBased ? RedisCommand.PUNSUBSCRIBE : RedisCommand.UNSUBSCRIBE;
var msg = Message.Create(-1, flags, cmd, channel);
if (internalCall) msg.SetInternalCall();
return oldOwner.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState);
return oldOwner.WriteDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState);
}
internal ServerEndPoint GetOwner() => Interlocked.CompareExchange(ref owner, null, null);
......@@ -201,7 +201,7 @@ internal void Resubscribe(RedisChannel channel, ServerEndPoint server)
var cmd = channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE;
var msg = Message.Create(-1, CommandFlags.FireAndForget, cmd, channel);
msg.SetInternalCall();
server.QueueDirectFireAndForget(msg, ResultProcessor.TrackSubscriptions);
server.WriteDirectFireAndForget(msg, ResultProcessor.TrackSubscriptions);
}
}
......
......@@ -236,7 +236,7 @@ public void SetUnselectable(UnselectableFlags flags)
public override string ToString() => Format.ToString(EndPoint);
public bool TryEnqueue(Message message) => GetBridge(message.Command)?.TryEnqueue(message, isSlave) == true;
public WriteResult TryWrite(Message message) => GetBridge(message.Command)?.TryWrite(message, isSlave) ?? WriteResult.NoConnectionAvailable;
internal void Activate(ConnectionType type, TextWriter log)
{
......@@ -330,7 +330,7 @@ internal Task Close()
}
else
{
return QueueDirectAsync(Message.Create(-1, CommandFlags.None, RedisCommand.QUIT), ResultProcessor.DemandOK, bridge: interactive);
return WriteDirectAsync(Message.Create(-1, CommandFlags.None, RedisCommand.QUIT), ResultProcessor.DemandOK, bridge: interactive);
}
}
......@@ -516,7 +516,7 @@ internal bool CheckInfoReplication()
{
var msg = Message.Create(-1, CommandFlags.FireAndForget | CommandFlags.HighPriority | CommandFlags.NoRedirect, RedisCommand.INFO, RedisLiterals.replication);
msg.SetInternalCall();
QueueDirectFireAndForget(msg, ResultProcessor.AutoConfigure, bridge);
WriteDirectFireAndForget(msg, ResultProcessor.AutoConfigure, bridge);
return true;
}
return false;
......@@ -546,26 +546,28 @@ internal void OnHeartbeat()
}
}
internal Task<T> QueueDirectAsync<T>(Message message, ResultProcessor<T> processor, object asyncState = null, PhysicalBridge bridge = null)
internal Task<T> WriteDirectAsync<T>(Message message, ResultProcessor<T> processor, object asyncState = null, PhysicalBridge bridge = null)
{
var tcs = TaskSource.Create<T>(asyncState);
var source = ResultBox<T>.Get(tcs);
message.SetSource(processor, source);
if (bridge == null) bridge = GetBridge(message.Command);
if (!bridge.TryEnqueue(message, isSlave))
var result = bridge.TryWrite(message, isSlave);
if (result != WriteResult.Success)
{
ConnectionMultiplexer.ThrowFailed(tcs, ExceptionFactory.NoConnectionAvailable(Multiplexer.IncludeDetailInExceptions, Multiplexer.IncludePerformanceCountersInExceptions, message.Command, message, this, Multiplexer.GetServerSnapshot()));
var ex = Multiplexer.GetException(result, message, this);
ConnectionMultiplexer.ThrowFailed(tcs, ex);
}
return tcs.Task;
}
internal void QueueDirectFireAndForget<T>(Message message, ResultProcessor<T> processor, PhysicalBridge bridge = null)
internal void WriteDirectFireAndForget<T>(Message message, ResultProcessor<T> processor, PhysicalBridge bridge = null)
{
if (message != null)
{
message.SetSource(processor, null);
Multiplexer.Trace("Enqueue: " + message);
(bridge ?? GetBridge(message.Command)).TryEnqueue(message, isSlave);
(bridge ?? GetBridge(message.Command)).TryWrite(message, isSlave);
}
}
......@@ -579,7 +581,7 @@ internal Task<bool> SendTracer(TextWriter log = null)
{
var msg = GetTracerMessage(false);
msg = LoggingMessage.Create(log, msg);
return QueueDirectAsync(msg, ResultProcessor.Tracer);
return WriteDirectAsync(msg, ResultProcessor.Tracer);
}
internal string Summary()
......@@ -623,7 +625,7 @@ internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection,
if (connection == null)
{
Multiplexer.Trace("Enqueue: " + message);
GetBridge(message.Command).TryEnqueue(message, isSlave);
GetBridge(message.Command).TryWrite(message, isSlave);
}
else
{
......@@ -636,7 +638,7 @@ internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection,
private PhysicalBridge CreateBridge(ConnectionType type, TextWriter log)
{
Multiplexer.Trace(type.ToString());
var bridge = new PhysicalBridge(this, type);
var bridge = new PhysicalBridge(this, type, Multiplexer.TimeoutMilliseconds);
bridge.TryConnect(log);
return bridge;
}
......
......@@ -151,7 +151,7 @@ public bool TryResend(int hashSlot, Message message, EndPoint endpoint, bool isM
else
{
message.PrepareToResend(resendVia, isMoved);
retry = resendVia.TryEnqueue(message);
retry = resendVia.TryWrite(message) == WriteResult.Success;
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment