Commit 4b48086b authored by Marc Gravell's avatar Marc Gravell

not having a bridge is no excuse not to complete something; if the bridge is...

not having a bridge is no excuse not to complete something; if the bridge is unavailable, complete things on the shared manager (which will defer to the thread-pool if disposed)
parent 21898f22
using System; using System;
using System.Collections.Generic;
using System.Text;
using System.Threading; using System.Threading;
namespace StackExchange.Redis namespace StackExchange.Redis
...@@ -18,6 +16,14 @@ public CompletionManager(ConnectionMultiplexer multiplexer, string name) ...@@ -18,6 +16,14 @@ public CompletionManager(ConnectionMultiplexer multiplexer, string name)
this.name = name; this.name = name;
} }
internal static void SharedCompleteSyncOrAsync(ICompletable operation)
{
if (operation == null) return;
if (!operation.TryComplete(false))
{
SocketManager.Shared.ScheduleTask(s_AnyOrderCompletionHandler, operation);
}
}
public void CompleteSyncOrAsync(ICompletable operation) public void CompleteSyncOrAsync(ICompletable operation)
{ {
if (operation == null) return; if (operation == null) return;
......
...@@ -9,6 +9,11 @@ ...@@ -9,6 +9,11 @@
namespace StackExchange.Redis namespace StackExchange.Redis
{ {
internal static class PhysicalBridgeHelpers
{
public static void CompleteSyncOrAsync(this PhysicalBridge bridge, ICompletable operation)
=> PhysicalBridge.CompleteSyncOrAsyncImpl(bridge, operation);
}
internal sealed partial class PhysicalBridge : IDisposable internal sealed partial class PhysicalBridge : IDisposable
{ {
internal readonly string Name; internal readonly string Name;
...@@ -86,9 +91,11 @@ public long SubscriptionCount ...@@ -86,9 +91,11 @@ public long SubscriptionCount
internal long OperationCount => Interlocked.Read(ref operationCount); internal long OperationCount => Interlocked.Read(ref operationCount);
public void CompleteSyncOrAsync(ICompletable operation) internal static void CompleteSyncOrAsyncImpl(PhysicalBridge bridge, ICompletable operation)
{ {
completionManager.CompleteSyncOrAsync(operation); var manager = bridge?.completionManager;
if (manager != null) manager.CompleteSyncOrAsync(operation);
else CompletionManager.SharedCompleteSyncOrAsync(operation);
} }
public void Dispose() public void Dispose()
...@@ -347,7 +354,7 @@ private void AbandonPendingBacklog(Exception ex) ...@@ -347,7 +354,7 @@ private void AbandonPendingBacklog(Exception ex)
if(next != null) if(next != null)
{ {
next.SetException(ex); next.SetException(ex);
CompleteSyncOrAsync(next); this.CompleteSyncOrAsync(next);
} }
} while (next != null); } while (next != null);
} }
...@@ -541,7 +548,7 @@ internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Me ...@@ -541,7 +548,7 @@ internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Me
// killed the underlying connection // killed the underlying connection
Trace("Unable to write to server"); Trace("Unable to write to server");
next.Fail(ConnectionFailureType.ProtocolFailure, null, "failure before write: " + result.ToString()); next.Fail(ConnectionFailureType.ProtocolFailure, null, "failure before write: " + result.ToString());
CompleteSyncOrAsync(next); this.CompleteSyncOrAsync(next);
return result; return result;
} }
//The parent message (next) may be returned from GetMessages //The parent message (next) may be returned from GetMessages
...@@ -741,7 +748,7 @@ private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection conne ...@@ -741,7 +748,7 @@ private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection conne
{ {
Trace("Write failed: " + ex.Message); Trace("Write failed: " + ex.Message);
message.Fail(ConnectionFailureType.InternalFailure, ex, null); message.Fail(ConnectionFailureType.InternalFailure, ex, null);
CompleteSyncOrAsync(message); this.CompleteSyncOrAsync(message);
// this failed without actually writing; we're OK with that... unless there's a transaction // this failed without actually writing; we're OK with that... unless there's a transaction
if (connection?.TransactionActive == true) if (connection?.TransactionActive == true)
...@@ -756,7 +763,7 @@ private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection conne ...@@ -756,7 +763,7 @@ private WriteResult WriteMessageToServerInsideWriteLock(PhysicalConnection conne
{ {
Trace("Write failed: " + ex.Message); Trace("Write failed: " + ex.Message);
message.Fail(ConnectionFailureType.InternalFailure, ex, null); message.Fail(ConnectionFailureType.InternalFailure, ex, null);
CompleteSyncOrAsync(message); this.CompleteSyncOrAsync(message);
// we're not sure *what* happened here; probably an IOException; kill the connection // we're not sure *what* happened here; probably an IOException; kill the connection
connection?.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex); connection?.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
......
...@@ -377,7 +377,7 @@ void add(string lk, string sk, string v) ...@@ -377,7 +377,7 @@ void add(string lk, string sk, string v)
var next = _writtenAwaitingResponse.Dequeue(); var next = _writtenAwaitingResponse.Dequeue();
BridgeCouldBeNull?.Trace("Failing: " + next); BridgeCouldBeNull?.Trace("Failing: " + next);
next.SetException(innerException is RedisException ? innerException : outerException); next.SetException(innerException is RedisException ? innerException : outerException);
BridgeCouldBeNull?.CompleteSyncOrAsync(next); BridgeCouldBeNull.CompleteSyncOrAsync(next);
} }
} }
...@@ -1229,7 +1229,7 @@ private void MatchResult(RawResult result) ...@@ -1229,7 +1229,7 @@ private void MatchResult(RawResult result)
Trace("Response to: " + msg); Trace("Response to: " + msg);
if (msg.ComputeResult(this, result)) if (msg.ComputeResult(this, result))
{ {
BridgeCouldBeNull?.CompleteSyncOrAsync(msg); BridgeCouldBeNull.CompleteSyncOrAsync(msg);
} }
} }
......
...@@ -86,7 +86,7 @@ internal Task RemoveAllSubscriptions(CommandFlags flags, object asyncState) ...@@ -86,7 +86,7 @@ internal Task RemoveAllSubscriptions(CommandFlags flags, object asyncState)
foreach (var pair in subscriptions) foreach (var pair in subscriptions)
{ {
var msg = pair.Value.ForSyncShutdown(); var msg = pair.Value.ForSyncShutdown();
if(msg != null) UnprocessableCompletionManager?.CompleteSyncOrAsync(msg); if(msg != null) UnprocessableCompletionManager.CompleteSyncOrAsync(msg);
pair.Value.Remove(true, null); pair.Value.Remove(true, null);
pair.Value.Remove(false, null); pair.Value.Remove(false, null);
......
...@@ -341,7 +341,7 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection) ...@@ -341,7 +341,7 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
foreach (var op in InnerOperations) foreach (var op in InnerOperations)
{ {
op.Wrapped.Cancel(); op.Wrapped.Cancel();
bridge?.CompleteSyncOrAsync(op.Wrapped); bridge.CompleteSyncOrAsync(op.Wrapped);
} }
} }
connection.Trace("End of transaction: " + Command); connection.Trace("End of transaction: " + Command);
...@@ -387,7 +387,7 @@ public override bool SetResult(PhysicalConnection connection, Message message, R ...@@ -387,7 +387,7 @@ public override bool SetResult(PhysicalConnection connection, Message message, R
foreach (var op in tran.InnerOperations) foreach (var op in tran.InnerOperations)
{ {
ServerFail(op.Wrapped, error); ServerFail(op.Wrapped, error);
bridge?.CompleteSyncOrAsync(op.Wrapped); bridge.CompleteSyncOrAsync(op.Wrapped);
} }
} }
return base.SetResult(connection, message, result); return base.SetResult(connection, message, result);
...@@ -416,7 +416,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -416,7 +416,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
foreach (var op in wrapped) foreach (var op in wrapped)
{ {
op.Wrapped.Cancel(); op.Wrapped.Cancel();
bridge?.CompleteSyncOrAsync(op.Wrapped); bridge.CompleteSyncOrAsync(op.Wrapped);
} }
SetResult(message, false); SetResult(message, false);
return true; return true;
...@@ -432,7 +432,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -432,7 +432,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
foreach (var op in wrapped) foreach (var op in wrapped)
{ {
op.Wrapped.Cancel(); op.Wrapped.Cancel();
bridge?.CompleteSyncOrAsync(op.Wrapped); bridge.CompleteSyncOrAsync(op.Wrapped);
} }
SetResult(message, false); SetResult(message, false);
return true; return true;
...@@ -444,7 +444,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -444,7 +444,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
{ {
if (wrapped[i].Wrapped.ComputeResult(connection, arr[i])) if (wrapped[i].Wrapped.ComputeResult(connection, arr[i]))
{ {
bridge?.CompleteSyncOrAsync(wrapped[i].Wrapped); bridge.CompleteSyncOrAsync(wrapped[i].Wrapped);
} }
} }
SetResult(message, true); SetResult(message, true);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment