Commit b3c880b3 authored by Marc Gravell's avatar Marc Gravell

fix length vs position error in ReadLineTerminatedString

parent b0637c6f
...@@ -1140,15 +1140,13 @@ private RawResult ReadBulkString(in ReadOnlySequence<byte> buffer, ref BufferRea ...@@ -1140,15 +1140,13 @@ private RawResult ReadBulkString(in ReadOnlySequence<byte> buffer, ref BufferRea
private RawResult ReadLineTerminatedString(ResultType type, in ReadOnlySequence<byte> buffer, ref BufferReader reader) private RawResult ReadLineTerminatedString(ResultType type, in ReadOnlySequence<byte> buffer, ref BufferReader reader)
{ {
int crlf = BufferReader.FindNextCrLf(reader); int crlfOffsetFromCurrent = BufferReader.FindNextCrLf(reader);
if (crlfOffsetFromCurrent < 0) return RawResult.Nil;
if (crlf < 0) return RawResult.Nil; var payload = buffer.Slice(reader.TotalConsumed, crlfOffsetFromCurrent);
reader.Consume(crlfOffsetFromCurrent + 2);
return new RawResult(type, payload, false);
var inner = buffer.Slice(reader.TotalConsumed, crlf);
reader.Consume(crlf + 2);
return new RawResult(type, inner, false);
} }
void ISocketCallback.StartReading() => ReadFromPipe(); void ISocketCallback.StartReading() => ReadFromPipe();
...@@ -1185,7 +1183,9 @@ public enum ConsumeResult ...@@ -1185,7 +1183,9 @@ public enum ConsumeResult
private ReadOnlySequence<byte>.Enumerator _iterator; private ReadOnlySequence<byte>.Enumerator _iterator;
private ReadOnlySpan<byte> _current; private ReadOnlySpan<byte> _current;
public ReadOnlySpan<byte> Span => _current; public ReadOnlySpan<byte> OversizedSpan => _current;
public ReadOnlySpan<byte> SlicedSpan => _current.Slice(OffsetThisSpan, RemainingThisSpan);
public int OffsetThisSpan { get; private set; } public int OffsetThisSpan { get; private set; }
public int TotalConsumed { get; private set; } public int TotalConsumed { get; private set; }
public int RemainingThisSpan { get; private set; } public int RemainingThisSpan { get; private set; }
...@@ -1280,23 +1280,20 @@ public void Consume(int count) ...@@ -1280,23 +1280,20 @@ public void Consume(int count)
bool haveTrailingCR = false; bool haveTrailingCR = false;
do do
{ {
var span = reader.Span; if (reader.RemainingThisSpan == 0) continue;
if (reader.OffsetThisSpan != 0) span = span.Slice(reader.OffsetThisSpan);
var span = reader.SlicedSpan;
if (span.IsEmpty) if (haveTrailingCR)
{ {
if(span[0] == '\n') return totalSkipped - 1;
haveTrailingCR = false; haveTrailingCR = false;
} }
else
{
if (haveTrailingCR && span[0] == '\n') return totalSkipped - 1;
int found = span.IndexOf(CRLF); int found = span.IndexOf(CRLF);
if (found >= 0) return totalSkipped + found; if (found >= 0) return totalSkipped + found;
haveTrailingCR = span[span.Length - 1] == '\r'; haveTrailingCR = span[span.Length - 1] == '\r';
totalSkipped += span.Length; totalSkipped += span.Length;
}
} }
while (reader.FetchNextSegment()); while (reader.FetchNextSegment());
return -1; return -1;
......
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.IO.Pipelines;
using System.Linq;
using StackExchange.Redis; using StackExchange.Redis;
class Program class Program
{ {
static int Main() static int Main()
{ {
var options = PipeOptions.Default;
Console.WriteLine(options.PauseWriterThreshold);
Console.WriteLine(options.ResumeWriterThreshold);
var s = new StringWriter(); var s = new StringWriter();
try try
{ {
#if DEBUG #if DEBUG
Pipelines.Sockets.Unofficial.DebugCounters.SetLog(Console.Out); // Pipelines.Sockets.Unofficial.DebugCounters.SetLog(Console.Out);
#endif #endif
// it is sometimes hard to get the debugger to play nicely with benchmarkdotnet/xunit attached,
// so this is just a *trivial* exe
var config = new ConfigurationOptions var config = new ConfigurationOptions
{ {
EndPoints = { "127.0.0.1" }, EndPoints = { "127.0.0.1" },
//TieBreaker = "",
//CommandMap = CommandMap.Create(new Dictionary<string, string>
//{
// ["SUBSCRIBE"] = null,
// ["PSUBSCRIBE"] = null,
//})
}; };
using (var conn = ConnectionMultiplexer.Connect(config, log: s)) using (var conn = ConnectionMultiplexer.Connect(config, log: s))
{ {
Console.WriteLine("Connected"); Execute(conn);
var db = conn.GetDatabase();
db.StringSet("abc", "def");
var x = db.StringGet("abc");
Console.WriteLine(x);
//for (int i = 0; i < 10; i++)
//{
// Console.WriteLine($"Ping {i}");
// db.Ping();
//}
} }
Console.WriteLine("Clean exit"); Console.WriteLine("Clean exit");
return 0; return 0;
...@@ -49,8 +38,23 @@ static int Main() ...@@ -49,8 +38,23 @@ static int Main()
} }
finally finally
{ {
Console.WriteLine(); // Console.WriteLine();
Console.WriteLine(s); // Console.WriteLine(s);
} }
} }
private static void Execute(ConnectionMultiplexer conn)
{
int pageSize = 100;
RedisKey key = nameof(Execute);
var db = conn.GetDatabase();
db.KeyDelete(key);
for (int i = 0; i < 2000; i++)
db.SetAdd(key, "s" + i, flags: CommandFlags.FireAndForget);
int count = db.SetScan(key, pageSize: pageSize).Count();
Console.WriteLine(count == 2000 ? "Pass" : "Fail");
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment