Commit 21704849 authored by Marc Gravell's avatar Marc Gravell

try pipelines v2 - now with "ref T Current"

parent 4ebacd62
...@@ -302,9 +302,8 @@ static GeoPosition AsGeoPosition(Sequence<RawResult> coords) ...@@ -302,9 +302,8 @@ static GeoPosition AsGeoPosition(Sequence<RawResult> coords)
} }
else else
{ {
var iter = coords.GetEnumerator(); longitude = (double)coords[0].AsRedisValue();
longitude = (double)iter.GetNext().AsRedisValue(); latitude = (double)coords[1].AsRedisValue();
latitude = (double)iter.GetNext().AsRedisValue();
} }
return new GeoPosition(longitude, latitude); return new GeoPosition(longitude, latitude);
......
...@@ -61,10 +61,9 @@ internal static RedisResult TryCreate(PhysicalConnection connection, in RawResul ...@@ -61,10 +61,9 @@ internal static RedisResult TryCreate(PhysicalConnection connection, in RawResul
if (items.Length == 0) return EmptyArray; if (items.Length == 0) return EmptyArray;
var arr = new RedisResult[items.Length]; var arr = new RedisResult[items.Length];
int i = 0; int i = 0;
var iter = items.GetEnumerator(); foreach(ref RawResult item in items)
while (iter.MoveNext())
{ {
var next = TryCreate(connection, in iter.CurrentReference); var next = TryCreate(connection, in item);
if (next == null) return null; // means we didn't understand if (next == null) return null; // means we didn't understand
arr[i++] = next; arr[i++] = next;
} }
......
...@@ -505,12 +505,11 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -505,12 +505,11 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
connection?.BridgeCouldBeNull?.Multiplexer?.OnTransactionLog($"processing {arr.Length} wrapped messages"); connection?.BridgeCouldBeNull?.Multiplexer?.OnTransactionLog($"processing {arr.Length} wrapped messages");
int i = 0; int i = 0;
var iter = arr.GetEnumerator(); foreach(ref RawResult item in arr)
while(iter.MoveNext())
{ {
var inner = wrapped[i++].Wrapped; var inner = wrapped[i++].Wrapped;
connection?.BridgeCouldBeNull?.Multiplexer?.OnTransactionLog($"> got {iter.Current} for {inner.CommandAndKey}"); connection?.BridgeCouldBeNull?.Multiplexer?.OnTransactionLog($"> got {item} for {inner.CommandAndKey}");
if (inner.ComputeResult(connection, iter.CurrentReference)) if (inner.ComputeResult(connection, in item))
{ {
inner.Complete(); inner.Complete();
} }
......
...@@ -719,9 +719,9 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -719,9 +719,9 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
var iter = result.GetItems().GetEnumerator(); var iter = result.GetItems().GetEnumerator();
while(iter.MoveNext()) while(iter.MoveNext())
{ {
ref RawResult key = ref iter.CurrentReference; ref RawResult key = ref iter.Current;
if (!iter.MoveNext()) break; if (!iter.MoveNext()) break;
ref RawResult val = ref iter.CurrentReference; ref RawResult val = ref iter.Current;
if (key.IsEqual(CommonReplies.timeout) && val.TryGetInt64(out long i64)) if (key.IsEqual(CommonReplies.timeout) && val.TryGetInt64(out long i64))
{ {
...@@ -1494,12 +1494,12 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -1494,12 +1494,12 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
var streams = result.GetItems().ToArray((in RawResult item, in MultiStreamProcessor obj) => var streams = result.GetItems().ToArray((in RawResult item, in MultiStreamProcessor obj) =>
{ {
var details = item.GetItems().GetEnumerator(); var details = item.GetItems();
// details[0] = Name of the Stream // details[0] = Name of the Stream
// details[1] = Multibulk Array of Stream Entries // details[1] = Multibulk Array of Stream Entries
return new RedisStream(key: details.GetNext().AsRedisKey(), return new RedisStream(key: details[0].AsRedisKey(),
entries: obj.ParseRedisStreamEntries(details.GetNext())); entries: obj.ParseRedisStreamEntries(details[1]));
}, this); }, this);
SetResult(message, streams); SetResult(message, streams);
...@@ -1705,11 +1705,10 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -1705,11 +1705,10 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
{ {
consumers = third.ToArray((in RawResult item) => consumers = third.ToArray((in RawResult item) =>
{ {
var details = item.GetItems().GetEnumerator(); var details = item.GetItems();
return new StreamConsumer( return new StreamConsumer(
name: details.GetNext().AsRedisValue(), name: details[0].AsRedisValue(),
pendingMessageCount: (int)details.GetNext().AsRedisValue()); pendingMessageCount: (int)details[1].AsRedisValue());
}); });
} }
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="1.1.14" /> <PackageReference Include="Pipelines.Sockets.Unofficial" Version="1.9.3-gf6c121fbdb" />
<PackageReference Include="System.Diagnostics.PerformanceCounter" Version="4.5.0" /> <PackageReference Include="System.Diagnostics.PerformanceCounter" Version="4.5.0" />
<PackageReference Include="System.IO.Pipelines" Version="4.5.1" /> <PackageReference Include="System.IO.Pipelines" Version="4.5.1" />
<PackageReference Include="System.Threading.Channels" Version="4.5.0" /> <PackageReference Include="System.Threading.Channels" Version="4.5.0" />
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment