Commit b854e4d7 authored by Nick Craver's avatar Nick Craver

Cleanup: RedisTransaction

parent 4b03e8d2
...@@ -46,15 +46,13 @@ public void Execute() ...@@ -46,15 +46,13 @@ public void Execute()
public bool Execute(CommandFlags flags) public bool Execute(CommandFlags flags)
{ {
ResultProcessor<bool> proc; var msg = CreateMessage(flags, out ResultProcessor<bool> proc);
var msg = CreateMessage(flags, out proc);
return base.ExecuteSync(msg, proc); // need base to avoid our local "not supported" override return base.ExecuteSync(msg, proc); // need base to avoid our local "not supported" override
} }
public Task<bool> ExecuteAsync(CommandFlags flags) public Task<bool> ExecuteAsync(CommandFlags flags)
{ {
ResultProcessor<bool> proc; var msg = CreateMessage(flags, out ResultProcessor<bool> proc);
var msg = CreateMessage(flags, out proc);
return base.ExecuteAsync(msg, proc); // need base to avoid our local wrapping override return base.ExecuteAsync(msg, proc); // need base to avoid our local wrapping override
} }
...@@ -87,8 +85,7 @@ internal override Task<T> ExecuteAsync<T>(Message message, ResultProcessor<T> pr ...@@ -87,8 +85,7 @@ internal override Task<T> ExecuteAsync<T>(Message message, ResultProcessor<T> pr
// (there is no task for the inner command) // (there is no task for the inner command)
(pending ?? (pending = new List<QueuedMessage>())).Add(queued); (pending ?? (pending = new List<QueuedMessage>())).Add(queued);
switch (message.Command)
switch(message.Command)
{ {
case RedisCommand.UNKNOWN: case RedisCommand.UNKNOWN:
case RedisCommand.EVAL: case RedisCommand.EVAL:
...@@ -110,6 +107,7 @@ internal override T ExecuteSync<T>(Message message, ResultProcessor<T> processor ...@@ -110,6 +107,7 @@ internal override T ExecuteSync<T>(Message message, ResultProcessor<T> processor
{ {
throw new NotSupportedException("ExecuteSync cannot be used inside a transaction"); throw new NotSupportedException("ExecuteSync cannot be used inside a transaction");
} }
private Message CreateMessage(CommandFlags flags, out ResultProcessor<bool> processor) private Message CreateMessage(CommandFlags flags, out ResultProcessor<bool> processor)
{ {
var work = pending; var work = pending;
...@@ -130,87 +128,84 @@ private Message CreateMessage(CommandFlags flags, out ResultProcessor<bool> proc ...@@ -130,87 +128,84 @@ private Message CreateMessage(CommandFlags flags, out ResultProcessor<bool> proc
processor = TransactionProcessor.Default; processor = TransactionProcessor.Default;
return new TransactionMessage(Database, flags, cond, work); return new TransactionMessage(Database, flags, cond, work);
} }
class QueuedMessage : Message
private class QueuedMessage : Message
{ {
private readonly Message wrapped; public Message Wrapped { get; }
private volatile bool wasQueued; private volatile bool wasQueued;
public QueuedMessage(Message message) : base(message.Db, message.Flags | CommandFlags.NoRedirect, message.Command) public QueuedMessage(Message message) : base(message.Db, message.Flags | CommandFlags.NoRedirect, message.Command)
{ {
message.SetNoRedirect(); message.SetNoRedirect();
this.wrapped = message; Wrapped = message;
} }
public bool WasQueued public bool WasQueued
{ {
get { return wasQueued; } get => wasQueued;
set { wasQueued = value; } set => wasQueued = value;
} }
public Message Wrapped => wrapped;
internal override void WriteImpl(PhysicalConnection physical) internal override void WriteImpl(PhysicalConnection physical)
{ {
wrapped.WriteImpl(physical); Wrapped.WriteImpl(physical);
wrapped.SetRequestSent(); Wrapped.SetRequestSent();
} }
} }
class QueuedProcessor : ResultProcessor<bool> private class QueuedProcessor : ResultProcessor<bool>
{ {
public static readonly ResultProcessor<bool> Default = new QueuedProcessor(); public static readonly ResultProcessor<bool> Default = new QueuedProcessor();
static readonly byte[] QUEUED = Encoding.UTF8.GetBytes("QUEUED"); private static readonly byte[] QUEUED = Encoding.UTF8.GetBytes("QUEUED");
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result) protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{ {
if(result.Type == ResultType.SimpleString && result.IsEqual(QUEUED)) if (result.Type == ResultType.SimpleString && result.IsEqual(QUEUED))
{
if (message is QueuedMessage q)
{ {
var q = message as QueuedMessage; q.WasQueued = true;
if (q != null) q.WasQueued = true; }
return true; return true;
} }
return false; return false;
} }
} }
class TransactionMessage : Message, IMultiMessage
{
static readonly ConditionResult[] NixConditions = new ConditionResult[0];
static readonly QueuedMessage[] NixMessages = new QueuedMessage[0];
private ConditionResult[] conditions; private class TransactionMessage : Message, IMultiMessage
{
private QueuedMessage[] operations; private static readonly ConditionResult[] NixConditions = new ConditionResult[0];
private static readonly QueuedMessage[] NixMessages = new QueuedMessage[0];
private readonly ConditionResult[] conditions;
public QueuedMessage[] InnerOperations { get; }
public TransactionMessage(int db, CommandFlags flags, List<ConditionResult> conditions, List<QueuedMessage> operations) public TransactionMessage(int db, CommandFlags flags, List<ConditionResult> conditions, List<QueuedMessage> operations)
: base(db, flags, RedisCommand.EXEC) : base(db, flags, RedisCommand.EXEC)
{ {
this.operations = (operations == null || operations.Count == 0) ? NixMessages : operations.ToArray(); this.InnerOperations = (operations == null || operations.Count == 0) ? NixMessages : operations.ToArray();
this.conditions = (conditions == null || conditions.Count == 0) ? NixConditions : conditions.ToArray(); this.conditions = (conditions == null || conditions.Count == 0) ? NixConditions : conditions.ToArray();
} }
public QueuedMessage[] InnerOperations => operations;
public bool IsAborted => command != RedisCommand.EXEC; public bool IsAborted => command != RedisCommand.EXEC;
public override void AppendStormLog(StringBuilder sb) public override void AppendStormLog(StringBuilder sb)
{ {
base.AppendStormLog(sb); base.AppendStormLog(sb);
if (conditions.Length != 0) sb.Append(", ").Append(conditions.Length).Append(" conditions"); if (conditions.Length != 0) sb.Append(", ").Append(conditions.Length).Append(" conditions");
sb.Append(", ").Append(operations.Length).Append(" operations"); sb.Append(", ").Append(InnerOperations.Length).Append(" operations");
} }
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
{ {
int slot = ServerSelectionStrategy.NoSlot; int slot = ServerSelectionStrategy.NoSlot;
for(int i = 0; i < conditions.Length;i++) for (int i = 0; i < conditions.Length; i++)
{ {
int newSlot = conditions[i].Condition.GetHashSlot(serverSelectionStrategy); int newSlot = conditions[i].Condition.GetHashSlot(serverSelectionStrategy);
slot = serverSelectionStrategy.CombineSlot(slot, newSlot); slot = serverSelectionStrategy.CombineSlot(slot, newSlot);
if (slot == ServerSelectionStrategy.MultipleSlots) return slot; if (slot == ServerSelectionStrategy.MultipleSlots) return slot;
} }
for(int i = 0; i < operations.Length;i++) for (int i = 0; i < InnerOperations.Length; i++)
{ {
int newSlot = operations[i].Wrapped.GetHashSlot(serverSelectionStrategy); int newSlot = InnerOperations[i].Wrapped.GetHashSlot(serverSelectionStrategy);
slot = serverSelectionStrategy.CombineSlot(slot, newSlot); slot = serverSelectionStrategy.CombineSlot(slot, newSlot);
if (slot == ServerSelectionStrategy.MultipleSlots) return slot; if (slot == ServerSelectionStrategy.MultipleSlots) return slot;
} }
...@@ -279,11 +274,11 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection) ...@@ -279,11 +274,11 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
} }
// PART 3: issue the commands // PART 3: issue the commands
if (!IsAborted && operations.Length != 0) if (!IsAborted && InnerOperations.Length != 0)
{ {
multiplexer.Trace("Issuing transaction operations"); multiplexer.Trace("Issuing transaction operations");
foreach (var op in operations) foreach (var op in InnerOperations)
{ {
if (explicitCheckForQueued) if (explicitCheckForQueued)
{ // need to have locked them before sending them { // need to have locked them before sending them
...@@ -311,7 +306,7 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection) ...@@ -311,7 +306,7 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
} }
else else
{ {
foreach (var op in operations) foreach (var op in InnerOperations)
{ {
if (!op.WasQueued) if (!op.WasQueued)
{ {
...@@ -321,7 +316,7 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection) ...@@ -321,7 +316,7 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
} }
} }
} }
multiplexer.Trace("Confirmed: QUEUED x " + operations.Length); multiplexer.Trace("Confirmed: QUEUED x " + InnerOperations.Length);
} }
else else
{ {
...@@ -341,7 +336,7 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection) ...@@ -341,7 +336,7 @@ public IEnumerable<Message> GetMessages(PhysicalConnection connection)
{ {
connection.Multiplexer.Trace("Aborting: canceling wrapped messages"); connection.Multiplexer.Trace("Aborting: canceling wrapped messages");
var bridge = connection.Bridge; var bridge = connection.Bridge;
foreach (var op in operations) foreach (var op in InnerOperations)
{ {
op.Wrapped.Cancel(); op.Wrapped.Cancel();
bridge.CompleteSyncOrAsync(op.Wrapped); bridge.CompleteSyncOrAsync(op.Wrapped);
...@@ -376,32 +371,28 @@ private bool AreAllConditionsSatisfied(ConnectionMultiplexer multiplexer) ...@@ -376,32 +371,28 @@ private bool AreAllConditionsSatisfied(ConnectionMultiplexer multiplexer)
} }
} }
class TransactionProcessor : ResultProcessor<bool> private class TransactionProcessor : ResultProcessor<bool>
{ {
public static readonly TransactionProcessor Default = new TransactionProcessor(); public static readonly TransactionProcessor Default = new TransactionProcessor();
public override bool SetResult(PhysicalConnection connection, Message message, RawResult result) public override bool SetResult(PhysicalConnection connection, Message message, RawResult result)
{ {
if (result.IsError) if (result.IsError && message is TransactionMessage tran)
{
var tran = message as TransactionMessage;
if (tran != null)
{ {
string error = result.GetString(); string error = result.GetString();
var bridge = connection.Bridge; var bridge = connection.Bridge;
foreach(var op in tran.InnerOperations) foreach (var op in tran.InnerOperations)
{ {
ServerFail(op.Wrapped, error); ServerFail(op.Wrapped, error);
bridge.CompleteSyncOrAsync(op.Wrapped); bridge.CompleteSyncOrAsync(op.Wrapped);
} }
} }
}
return base.SetResult(connection, message, result); return base.SetResult(connection, message, result);
} }
protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result) protected override bool SetResultCore(PhysicalConnection connection, Message message, RawResult result)
{ {
var tran = message as TransactionMessage; if (message is TransactionMessage tran)
if (tran != null)
{ {
var bridge = connection.Bridge; var bridge = connection.Bridge;
var wrapped = tran.InnerOperations; var wrapped = tran.InnerOperations;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment