Commit e3f96fb5 authored by Marc Gravell's avatar Marc Gravell

only flush if write was successful; if it throws when writing, make sure that...

only flush if write was successful; if it throws when writing, make sure that items in a transaction are aborted
parent 12322121
...@@ -614,9 +614,10 @@ internal void Fail(ConnectionFailureType failure, Exception innerException, stri ...@@ -614,9 +614,10 @@ internal void Fail(ConnectionFailureType failure, Exception innerException, stri
resultProcessor?.ConnectionFail(this, failure, innerException, annotation); resultProcessor?.ConnectionFail(this, failure, innerException, annotation);
} }
internal void SetException(Exception exception) internal virtual void SetExceptionAndComplete(Exception exception, PhysicalBridge bridge)
{ {
resultBox?.SetException(exception); resultBox?.SetException(exception);
bridge.CompleteSyncOrAsync(this);
} }
internal bool TrySetResult<T>(T value) => resultBox is ResultBox<T> typed && typed.TrySetResult(value); internal bool TrySetResult<T>(T value) => resultBox is ResultBox<T> typed && typed.TrySetResult(value);
......
...@@ -371,8 +371,7 @@ private void AbandonPendingBacklog(Exception ex) ...@@ -371,8 +371,7 @@ private void AbandonPendingBacklog(Exception ex)
if (next != null) if (next != null)
{ {
Multiplexer?.OnMessageFaulted(next, ex); Multiplexer?.OnMessageFaulted(next, ex);
next.SetException(ex); next.SetExceptionAndComplete(ex, this);
this.CompleteSyncOrAsync(next);
} }
} while (next != null); } while (next != null);
} }
...@@ -603,9 +602,19 @@ internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Me ...@@ -603,9 +602,19 @@ internal WriteResult WriteMessageTakingWriteLock(PhysicalConnection physical, Me
{ {
result = WriteMessageToServerInsideWriteLock(physical, message); result = WriteMessageToServerInsideWriteLock(physical, message);
} }
if (result == WriteResult.Success)
{
result = physical.FlushSync(); result = physical.FlushSync();
}
physical.SetIdle(); physical.SetIdle();
} }
catch (Exception ex)
{
var inner = new RedisConnectionException(ConnectionFailureType.InternalFailure, "Failed to write", ex);
message.SetExceptionAndComplete(inner, this);
result = WriteResult.WriteFailure;
}
finally finally
{ {
if (haveLock) if (haveLock)
......
...@@ -403,6 +403,7 @@ void add(string lk, string sk, string v) ...@@ -403,6 +403,7 @@ void add(string lk, string sk, string v)
if (next.Command == RedisCommand.QUIT && next.TrySetResult(true)) if (next.Command == RedisCommand.QUIT && next.TrySetResult(true))
{ {
// fine, death of a socket is close enough // fine, death of a socket is close enough
bridge.CompleteSyncOrAsync(next);
} }
else else
{ {
...@@ -412,9 +413,8 @@ void add(string lk, string sk, string v) ...@@ -412,9 +413,8 @@ void add(string lk, string sk, string v)
bridge.Trace("Failing: " + next); bridge.Trace("Failing: " + next);
bridge.Multiplexer?.OnMessageFaulted(next, ex, origin); bridge.Multiplexer?.OnMessageFaulted(next, ex, origin);
} }
next.SetException(ex); next.SetExceptionAndComplete(ex, bridge);
} }
bridge.CompleteSyncOrAsync(next);
} }
} }
...@@ -589,8 +589,7 @@ internal void OnBridgeHeartbeat() ...@@ -589,8 +589,7 @@ internal void OnBridgeHeartbeat()
{ {
var timeoutEx = ExceptionFactory.Timeout(includeDetail, $"Timeout awaiting response ({elapsed}ms elapsed, timeout is {timeout}ms)", msg, server); var timeoutEx = ExceptionFactory.Timeout(includeDetail, $"Timeout awaiting response ({elapsed}ms elapsed, timeout is {timeout}ms)", msg, server);
bridge.Multiplexer?.OnMessageFaulted(msg, timeoutEx); bridge.Multiplexer?.OnMessageFaulted(msg, timeoutEx);
msg.SetException(timeoutEx); // tell the message that it is doomed msg.SetExceptionAndComplete(timeoutEx, bridge); // tell the message that it is doomed
bridge.CompleteSyncOrAsync(msg); // prod it - kicks off async continuations etc
bridge.Multiplexer.OnAsyncTimeout(); bridge.Multiplexer.OnAsyncTimeout();
} }
// note: it is important that we **do not** remove the message unless we're tearing down the socket; that // note: it is important that we **do not** remove the message unless we're tearing down the socket; that
......
...@@ -198,6 +198,19 @@ public TransactionMessage(int db, CommandFlags flags, List<ConditionResult> cond ...@@ -198,6 +198,19 @@ public TransactionMessage(int db, CommandFlags flags, List<ConditionResult> cond
this.conditions = (conditions == null || conditions.Count == 0) ? Array.Empty<ConditionResult>(): conditions.ToArray(); this.conditions = (conditions == null || conditions.Count == 0) ? Array.Empty<ConditionResult>(): conditions.ToArray();
} }
internal override void SetExceptionAndComplete(Exception exception, PhysicalBridge bridge)
{
var inner = InnerOperations;
if (inner != null)
{
for(int i = 0; i < inner.Length;i++)
{
inner[i].Wrapped.SetExceptionAndComplete(exception, bridge);
}
}
base.SetExceptionAndComplete(exception, bridge);
}
public bool IsAborted => command != RedisCommand.EXEC; public bool IsAborted => command != RedisCommand.EXEC;
public override void AppendStormLog(StringBuilder sb) public override void AppendStormLog(StringBuilder sb)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment