Commit 7a7647a5 authored by Marc Gravell's avatar Marc Gravell

Merge branch 'pipelines' into remove-preserve-order

parents 6c8cf01d f2cf3a18
......@@ -87,6 +87,8 @@ public void CheckDatabaseMethodsUseKeys(Type type)
case nameof(IDatabaseAsync.ExecuteAsync):
case nameof(IDatabase.ScriptEvaluate):
case nameof(IDatabaseAsync.ScriptEvaluateAsync):
case nameof(IDatabase.StreamRead):
case nameof(IDatabase.StreamReadAsync):
continue; // they're fine, but don't want to widen check to return type
}
......@@ -214,7 +216,8 @@ private void CheckMethod(MethodInfo method, bool isAsync)
|| shortName.StartsWith("Set")
|| shortName.StartsWith("Script")
|| shortName.StartsWith("SortedSet")
|| shortName.StartsWith("String")
|| shortName.StartsWith("String")
|| shortName.StartsWith("Stream")
, fullName + ":Prefix");
}
......
using System;
using System.Linq;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
......@@ -481,7 +482,7 @@ public void StreamConsumerGroupViewPendingInfoSummary()
}
[Fact]
public void StreamConsumerGroupViewPendingMessageInfo()
public async Task StreamConsumerGroupViewPendingMessageInfo()
{
var key = GetUniqueKey("group_pending_messages");
var groupName = "test_group";
......@@ -507,6 +508,8 @@ public void StreamConsumerGroupViewPendingMessageInfo()
// Read the remaining messages into the second consumer.
var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2);
await Task.Delay(10);
// Get the pending info about the messages themselves.
var pendingMessageInfoList = db.StreamPendingMessages(key, groupName, 10, RedisValue.Null);
......
......@@ -662,9 +662,9 @@ public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue
return Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, flags);
}
public RedisStreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None)
public RedisStreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamRange(ToInner(key), minId, maxId, count, order, flags);
return Inner.StreamRange(ToInner(key), minId, maxId, count, messageOrder, flags);
}
public RedisStreamEntry[] StreamRead(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None)
......
......@@ -641,9 +641,9 @@ public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key,
return Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, flags);
}
public Task<RedisStreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None)
public Task<RedisStreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
return Inner.StreamRangeAsync(ToInner(key), minId, maxId, count, order, flags);
return Inner.StreamRangeAsync(ToInner(key), minId, maxId, count, messageOrder, flags);
}
public Task<RedisStreamEntry[]> StreamReadAsync(RedisKey key, RedisValue afterId, int? count = null, CommandFlags flags = CommandFlags.None)
......
......@@ -118,7 +118,7 @@ public void Dispose()
{
var ioPipe = _ioPipe;
_ioPipe = null;
if(ioPipe != null)
if (ioPipe != null)
{
Multiplexer.Trace("Disconnecting...", physicalName);
try { ioPipe.Input?.CancelPendingRead(); } catch { }
......@@ -424,7 +424,7 @@ internal void Write(RedisChannel channel)
internal void Write(RedisValue value)
{
switch(value.Type)
switch (value.Type)
{
case RedisValue.StorageType.Null:
WriteUnified(_ioPipe.Output, (byte[])null);
......@@ -681,7 +681,7 @@ internal void WriteSha1AsHex(byte[] value)
{
writer.Write(NullBulkString);
}
else if(value.Length == ResultProcessor.ScriptLoadProcessor.Sha1HashLength)
else if (value.Length == ResultProcessor.ScriptLoadProcessor.Sha1HashLength)
{
// $40\r\n = 5
// {40 bytes}\r\n = 42
......@@ -694,7 +694,7 @@ internal void WriteSha1AsHex(byte[] value)
span[4] = (byte)'\n';
int offset = 5;
for(int i = 0; i < value.Length; i++)
for (int i = 0; i < value.Length; i++)
{
var b = value[i];
span[offset++] = ToHexNibble(value[i] >> 4);
......@@ -783,14 +783,14 @@ private unsafe void WriteRaw(PipeWriter writer, string value, int expectedLength
bool completed;
fixed (byte* bPtr = &MemoryMarshal.GetReference(span))
{
outEncoder.Convert(cPtr + charOffset, span.Length, bPtr, span.Length, final, out charsUsed, out bytesUsed, out completed);
outEncoder.Convert(cPtr + charOffset, charsRemaining, bPtr, span.Length, final, out charsUsed, out bytesUsed, out completed);
}
writer.Advance(bytesUsed);
totalBytes += bytesUsed;
charOffset += charsUsed;
charsRemaining -= charsUsed;
if(charsRemaining <= 0)
if (charsRemaining <= 0)
{
if (charsRemaining < 0) throw new InvalidOperationException("String encode went negative");
if (completed) break; // fine
......@@ -1035,18 +1035,18 @@ void ISocketCallback.OnHeartbeat()
// note: TryRead will give us back the same buffer in a tight loop
// - so: only use that if we're making progress
if(!(allowSyncRead && input.TryRead(out var readResult)))
if (!(allowSyncRead && input.TryRead(out var readResult)))
{
readResult = await input.ReadAsync();
}
var buffer = readResult.Buffer;
int handled = 0;
if(!buffer.IsEmpty)
if (!buffer.IsEmpty)
{
handled = ProcessBuffer(ref buffer); // updates buffer.Start
}
allowSyncRead = handled != 0;
Multiplexer.Trace($"Processed {handled} messages", physicalName);
......@@ -1177,9 +1177,9 @@ private RawResult ReadBulkString(in ReadOnlySequence<byte> buffer, ref BufferRea
return new RawResult(ResultType.BulkString, ReadOnlySequence<byte>.Empty, true);
}
if(reader.TryConsumeAsBuffer(bodySize, out var payload))
if (reader.TryConsumeAsBuffer(bodySize, out var payload))
{
switch(reader.TryConsumeCRLF())
switch (reader.TryConsumeCRLF())
{
case ConsumeResult.NeedMoreData:
break; // see NilResult below
......@@ -1285,7 +1285,7 @@ public BufferReader(ReadOnlySequence<byte> buffer)
/// </summary>
public ConsumeResult TryConsumeCRLF()
{
switch(RemainingThisSpan)
switch (RemainingThisSpan)
{
case 0:
return ConsumeResult.NeedMoreData;
......@@ -1328,13 +1328,13 @@ public bool TryConsume(int count)
return false;
}
readonly ReadOnlySequence<byte> _buffer;
SequencePosition _lastSnapshotPosition;
long _lastSnapshotBytes;
private readonly ReadOnlySequence<byte> _buffer;
private SequencePosition _lastSnapshotPosition;
private long _lastSnapshotBytes;
// makes an internal note of where we are, as a SequencePosition; useful
// to avoid having to use buffer.Slice on huge ranges
SequencePosition SnapshotPosition()
private SequencePosition SnapshotPosition()
{
var consumed = TotalConsumed;
var delta = consumed - _lastSnapshotBytes;
......@@ -1346,7 +1346,7 @@ SequencePosition SnapshotPosition()
}
public ReadOnlySequence<byte> ConsumeAsBuffer(int count)
{
if(!TryConsumeAsBuffer(count, out var buffer)) throw new EndOfStreamException();
if (!TryConsumeAsBuffer(count, out var buffer)) throw new EndOfStreamException();
return buffer;
}
......@@ -1380,7 +1380,7 @@ public void Consume(int count)
var span = reader.SlicedSpan;
if (haveTrailingCR)
{
if(span[0] == '\n') return totalSkipped - 1;
if (span[0] == '\n') return totalSkipped - 1;
haveTrailingCR = false;
}
......
......@@ -655,7 +655,7 @@ private sealed class KeyMigrateCommandMessage : Message.CommandKeyBase // MIGRAT
private readonly MigrateOptions migrateOptions;
private readonly int timeoutMilliseconds;
private readonly int toDatabase;
private RedisValue toHost, toPort;
private readonly RedisValue toHost, toPort;
public KeyMigrateCommandMessage(int db, RedisKey key, EndPoint toServer, int toDatabase, int timeoutMilliseconds, MigrateOptions migrateOptions, CommandFlags flags)
: base(db, flags, RedisCommand.MIGRATE, key)
......
using System;
using System.IO;
using System.Runtime.InteropServices;
using System.Text;
......@@ -607,7 +608,7 @@ private static string ToHex(ReadOnlySpan<byte> src)
const string HexValues = "0123456789ABCDEF";
if (src.IsEmpty) return "";
var s = new string((char)0, src.Length * 3 - 1);
var s = new string((char)0, (src.Length * 3) - 1);
var dst = MemoryMarshal.AsMemory(s.AsMemory()).Span;
int i = 0;
......@@ -742,5 +743,24 @@ private RedisValue Simplify()
}
return this;
}
/// <summary>
/// Create a RedisValue from a MemoryStream; it will *attempt* to use the internal buffer
/// directly, but if this isn't possibly it will fallback to ToArray
/// </summary>
public static RedisValue CreateFrom(MemoryStream stream)
{
if (stream == null) return Null;
if (stream.Length == 0) return Array.Empty<byte>();
if(stream.TryGetBuffer(out var segment))
{
return new Memory<byte>(segment.Array, segment.Offset, segment.Count);
}
else
{
// nowhere near as efficient, but...
return stream.ToArray();
}
}
}
}
......@@ -1334,7 +1334,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
internal sealed class SingleStreamProcessor : StreamProcessorBase<RedisStreamEntry[]>
{
private bool skipStreamName;
private readonly bool skipStreamName;
public SingleStreamProcessor(bool skipStreamName = false)
{
......@@ -1455,7 +1455,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
return true;
}
}
static T[] ConvertAll<T>(ReadOnlySpan<RawResult> items, Func<RawResult, T> converter)
private static T[] ConvertAll<T>(ReadOnlySpan<RawResult> items, Func<RawResult, T> converter)
{
if (items.Length == 0) return Array.Empty<T>();
T[] arr = new T[items.Length];
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment