Commit 0410468b authored by Marc Gravell's avatar Marc Gravell

kill the unsent queue when connection fail; this is far more reasonable...

kill the unsent queue when connection fail; this is far more reasonable (expected), and ensures async works correctly
parent 860becae
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NUnit.Framework;
namespace StackExchange.Redis.Tests
{
[TestFixture]
public class AsyncTests : TestBase
{
protected override string GetConfiguration()
{
return PrimaryServer + ":" + PrimaryPortString;
}
#if DEBUG // IRedisServerDebug and AllowConnect are only available if DEBUG is defined
[Test]
public void AsyncTasksReportFailureIfServerUnavailable()
{
SetExpectedAmbientFailureCount(-1); // this will get messy
using(var conn = Create(allowAdmin: true))
{
var server = (IRedisServerDebug)conn.GetServer(PrimaryServer, PrimaryPort);
RedisKey key = Me();
var db = conn.GetDatabase();
db.KeyDelete(key);
var a = db.SetAddAsync(key, "a");
var b = db.SetAddAsync(key, "b");
Assert.AreEqual(true, conn.Wait(a));
Assert.AreEqual(true, conn.Wait(b));
conn.AllowConnect = false;
server.SimulateConnectionFailure();
var c = db.SetAddAsync(key, "c");
Assert.IsTrue(c.IsFaulted, "faulted");
var ex = c.Exception.InnerExceptions.Single();
Assert.IsInstanceOf<RedisConnectionException>(ex);
Assert.AreEqual("No connection is available to service this operation: SADD", ex.Message);
}
}
#endif
}
}
...@@ -60,6 +60,7 @@ ...@@ -60,6 +60,7 @@
<Reference Include="Microsoft.CSharp" /> <Reference Include="Microsoft.CSharp" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Compile Include="AsyncTests.cs" />
<Compile Include="BasicOps.cs" /> <Compile Include="BasicOps.cs" />
<Compile Include="Cluster.cs" /> <Compile Include="Cluster.cs" />
<Compile Include="Commands.cs" /> <Compile Include="Commands.cs" />
......
...@@ -68,6 +68,17 @@ internal void Add(ConnectionCounters other) ...@@ -68,6 +68,17 @@ internal void Add(ConnectionCounters other)
this.NonPreferredEndpointCount += other.NonPreferredEndpointCount; this.NonPreferredEndpointCount += other.NonPreferredEndpointCount;
} }
internal bool Any()
{
return CompletedAsynchronously != 0 || CompletedSynchronously != 0
|| FailedAsynchronously != 0 || OperationCount != 0
|| PendingUnsentItems != 0 || ResponsesAwaitingAsyncCompletion != 0
|| SentItemsAwaitingResponse != 0 || SocketCount != 0
|| Subscriptions != 0 || WriterCount != 0
|| NonPreferredEndpointCount != 0;
}
/// <summary> /// <summary>
/// Operations that have been requested, but which have not yet been sent to the server /// Operations that have been requested, but which have not yet been sent to the server
/// </summary> /// </summary>
......
...@@ -59,6 +59,7 @@ public ServerCounters GetCounters() ...@@ -59,6 +59,7 @@ public ServerCounters GetCounters()
{ {
counters.Add(snapshot[i].GetCounters()); counters.Add(snapshot[i].GetCounters());
} }
unprocessableCompletionManager.GetCounters(counters.Other);
return counters; return counters;
} }
...@@ -1098,7 +1099,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1098,7 +1099,7 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
{ {
Trace("Testing: " + Format.ToString(endpoints[i])); Trace("Testing: " + Format.ToString(endpoints[i]));
var server = GetServerEndPoint(endpoints[i]); var server = GetServerEndPoint(endpoints[i]);
server.ReportNextFailure(); //server.ReportNextFailure();
servers[i] = server; servers[i] = server;
if (reconfigureAll && server.IsConnected) if (reconfigureAll && server.IsConnected)
{ {
......
...@@ -59,6 +59,8 @@ public sealed class RedisServerException : RedisException ...@@ -59,6 +59,8 @@ public sealed class RedisServerException : RedisException
abstract class Message : ICompletable abstract class Message : ICompletable
{ {
public static readonly Message[] EmptyArray = new Message[0];
public readonly int Db; public readonly int Db;
protected RedisCommand command; protected RedisCommand command;
......
...@@ -26,6 +26,22 @@ public Message Dequeue() ...@@ -26,6 +26,22 @@ public Message Dequeue()
return null; return null;
} }
internal Message[] DequeueAll()
{
lock (regular)
{
int count = high.Count + regular.Count;
if (count == 0) return Message.EmptyArray;
var arr = new Message[count];
high.CopyTo(arr, 0);
regular.CopyTo(arr, high.Count);
high.Clear();
regular.Clear();
return arr;
}
}
public object SyncLock { get { return regular; } } public object SyncLock { get { return regular; } }
public Message PeekPing(out int queueLength) public Message PeekPing(out int queueLength)
{ {
......
...@@ -384,10 +384,18 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -384,10 +384,18 @@ internal void OnHeartbeat(bool ifConnectedOnly)
case (int)State.Disconnected: case (int)State.Disconnected:
if (!ifConnectedOnly) if (!ifConnectedOnly)
{ {
AbortUnsent();
multiplexer.Trace("Resurrecting " + this.ToString()); multiplexer.Trace("Resurrecting " + this.ToString());
GetConnection(); GetConnection();
} }
break; break;
case (int)State.Connecting:
default:
if (!ifConnectedOnly)
{
AbortUnsent();
}
break;
} }
} }
catch (Exception ex) catch (Exception ex)
...@@ -469,7 +477,7 @@ internal bool ConfirmRemoveFromWriteQueue() ...@@ -469,7 +477,7 @@ internal bool ConfirmRemoveFromWriteQueue()
/// connected" handshake (when there is no dequeue loop) - otherwise, /// connected" handshake (when there is no dequeue loop) - otherwise,
/// you can pretty much assume you're going to destroy the stream /// you can pretty much assume you're going to destroy the stream
/// </summary> /// </summary>
internal void WriteMessageDirect(PhysicalConnection tmp, Message next) internal bool WriteMessageDirect(PhysicalConnection tmp, Message next)
{ {
Trace("Writing: " + next); Trace("Writing: " + next);
if (next is IMultiMessage) if (next is IMultiMessage)
...@@ -484,13 +492,14 @@ internal void WriteMessageDirect(PhysicalConnection tmp, Message next) ...@@ -484,13 +492,14 @@ internal void WriteMessageDirect(PhysicalConnection tmp, Message next)
Trace("Unable to write to server"); Trace("Unable to write to server");
next.Fail(ConnectionFailureType.ProtocolFailure, null); next.Fail(ConnectionFailureType.ProtocolFailure, null);
CompleteSyncOrAsync(next); CompleteSyncOrAsync(next);
break; return false;
} }
} }
return true;
} }
else else
{ {
WriteMessageToServer(tmp, next); return WriteMessageToServer(tmp, next);
} }
} }
...@@ -500,10 +509,27 @@ private State ChangeState(State newState) ...@@ -500,10 +509,27 @@ private State ChangeState(State newState)
if (oldState != newState) if (oldState != newState)
{ {
multiplexer.Trace(connectionType + " state changed from " + oldState + " to " + newState); multiplexer.Trace(connectionType + " state changed from " + oldState + " to " + newState);
if (newState == State.Disconnected)
{
AbortUnsent();
}
} }
return oldState; return oldState;
} }
private void AbortUnsent()
{
var dead = queue.DequeueAll();
Trace(dead.Length != 0, "Aborting " + dead.Length + " messages");
for (int i = 0; i < dead.Length; i++)
{
var msg = dead[i];
msg.Fail(ConnectionFailureType.UnableToResolvePhysicalConnection, null);
CompleteSyncOrAsync(msg);
}
}
private bool ChangeState(State oldState, State newState) private bool ChangeState(State oldState, State newState)
{ {
bool result = Interlocked.CompareExchange(ref state, (int)newState, (int)oldState) == (int)oldState; bool result = Interlocked.CompareExchange(ref state, (int)newState, (int)oldState) == (int)oldState;
...@@ -659,7 +685,7 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message ...@@ -659,7 +685,7 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message
message.Fail(ConnectionFailureType.InternalFailure, ex); message.Fail(ConnectionFailureType.InternalFailure, ex);
CompleteSyncOrAsync(message); CompleteSyncOrAsync(message);
// we're not sure *what* happened here; kill the connection // we're not sure *what* happened here; probably an IOException; kill the connection
if(connection != null) connection.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex); if(connection != null) connection.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
return false; return false;
} }
...@@ -683,6 +709,7 @@ internal WriteResult WriteQueue(int maxWork) ...@@ -683,6 +709,7 @@ internal WriteResult WriteQueue(int maxWork)
var conn = GetConnection(); var conn = GetConnection();
if(conn == null) if(conn == null)
{ {
AbortUnsent();
Trace("Connection not available; exiting"); Trace("Connection not available; exiting");
return WriteResult.NoConnection; return WriteResult.NoConnection;
} }
...@@ -702,7 +729,12 @@ internal WriteResult WriteQueue(int maxWork) ...@@ -702,7 +729,12 @@ internal WriteResult WriteQueue(int maxWork)
last = next; last = next;
Trace("Now pending: " + GetPendingCount()); Trace("Now pending: " + GetPendingCount());
WriteMessageDirect(conn, next); if(!WriteMessageDirect(conn, next))
{
AbortUnsent();
Trace("write failed; connection is toast; exiting");
return WriteResult.NoConnection;
}
count++; count++;
if (maxWork > 0 && count >= maxWork) if (maxWork > 0 && count >= maxWork)
{ {
......
...@@ -160,8 +160,8 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception ...@@ -160,8 +160,8 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception
if (isCurrent && Interlocked.CompareExchange(ref failureReported, 1, 0) == 0) if (isCurrent && Interlocked.CompareExchange(ref failureReported, 1, 0) == 0)
{ {
try //try
{ //{
long now = Environment.TickCount, lastRead = Interlocked.Read(ref lastReadTickCount), lastWrite = Interlocked.Read(ref lastWriteTickCount), long now = Environment.TickCount, lastRead = Interlocked.Read(ref lastReadTickCount), lastWrite = Interlocked.Read(ref lastWriteTickCount),
lastBeat = Interlocked.Read(ref lastBeatTickCount); lastBeat = Interlocked.Read(ref lastBeatTickCount);
...@@ -175,12 +175,14 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception ...@@ -175,12 +175,14 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception
var ex = innerException == null var ex = innerException == null
? new RedisConnectionException(failureType, message) ? new RedisConnectionException(failureType, message)
: new RedisConnectionException(failureType, message, innerException); : new RedisConnectionException(failureType, message, innerException);
throw ex;
} bridge.OnConnectionFailed(this, failureType, ex);
catch (Exception caught) // throw ex;
{ //}
bridge.OnConnectionFailed(this, failureType, caught); //catch (Exception caught)
} //{
// bridge.OnConnectionFailed(this, failureType, caught);
//}
} }
......
...@@ -96,9 +96,12 @@ internal override T ExecuteSync<T>(Message message, ResultProcessor<T> processor ...@@ -96,9 +96,12 @@ internal override T ExecuteSync<T>(Message message, ResultProcessor<T> processor
private void FailNoServer(List<Message> messages) private void FailNoServer(List<Message> messages)
{ {
if (messages == null) return; if (messages == null) return;
var completion = multiplexer.UnprocessableCompletionManager;
foreach(var msg in messages) foreach(var msg in messages)
{ {
msg.Fail(ConnectionFailureType.UnableToResolvePhysicalConnection, null); msg.Fail(ConnectionFailureType.UnableToResolvePhysicalConnection, null);
completion.CompleteSyncOrAsync(msg);
} }
} }
} }
......
...@@ -11,14 +11,15 @@ abstract partial class ResultBox ...@@ -11,14 +11,15 @@ abstract partial class ResultBox
public void SetException(Exception exception) public void SetException(Exception exception)
{ {
try this.exception = exception;
{ //try
throw exception; //{
} // throw exception;
catch (Exception caught) //}
{ // stacktrace etc //catch (Exception caught)
this.exception = caught; //{ // stacktrace etc
} // this.exception = caught;
//}
} }
public abstract bool TryComplete(bool isAsync); public abstract bool TryComplete(bool isAsync);
......
...@@ -14,6 +14,7 @@ internal ServerCounters(EndPoint endpoint) ...@@ -14,6 +14,7 @@ internal ServerCounters(EndPoint endpoint)
this.EndPoint = endpoint; this.EndPoint = endpoint;
this.Interactive = new ConnectionCounters(ConnectionType.Interactive); this.Interactive = new ConnectionCounters(ConnectionType.Interactive);
this.Subscription = new ConnectionCounters(ConnectionType.Subscription); this.Subscription = new ConnectionCounters(ConnectionType.Subscription);
this.Other = new ConnectionCounters(ConnectionType.None);
} }
/// <summary> /// <summary>
...@@ -29,14 +30,18 @@ internal ServerCounters(EndPoint endpoint) ...@@ -29,14 +30,18 @@ internal ServerCounters(EndPoint endpoint)
/// <summary> /// <summary>
/// Counters associated with the subscription (pub-sub) connection /// Counters associated with the subscription (pub-sub) connection
/// </summary> /// </summary>
public ConnectionCounters Subscription { get; private set; } public ConnectionCounters Subscription { get; private set; }
/// <summary>
/// Counters associated with other ambient activity
/// </summary>
public ConnectionCounters Other { get; private set; }
/// <summary> /// <summary>
/// Indicates the total number of outstanding items against this server /// Indicates the total number of outstanding items against this server
/// </summary> /// </summary>
public long TotalOutstanding { get { return Interactive.TotalOutstanding + Subscription.TotalOutstanding; } } public long TotalOutstanding { get { return Interactive.TotalOutstanding + Subscription.TotalOutstanding + Other.TotalOutstanding; } }
/// <summary> /// <summary>
/// See Object.ToString(); /// See Object.ToString();
...@@ -48,6 +53,11 @@ public override string ToString() ...@@ -48,6 +53,11 @@ public override string ToString()
Interactive.Append(sb); Interactive.Append(sb);
sb.Append("; sub "); sb.Append("; sub ");
Subscription.Append(sb); Subscription.Append(sb);
if (Other.Any())
{
sb.Append("; other ");
Other.Append(sb);
}
return sb.ToString(); return sb.ToString();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment