Unverified Commit 74c5a2ac authored by Marc Gravell's avatar Marc Gravell Committed by GitHub

Pipelines race (#857)

* really nasty commit - incomplete investigating thread-race

* fixed race - was incomplete messages (packet fragmentation)
parent 3e9fee8b
...@@ -10,6 +10,11 @@ public static async Task Main() ...@@ -10,6 +10,11 @@ public static async Task Main()
{ {
using (var conn = await ConnectionMultiplexer.ConnectAsync("127.0.0.1:6379,syncTimeout=2000")) using (var conn = await ConnectionMultiplexer.ConnectAsync("127.0.0.1:6379,syncTimeout=2000"))
{ {
int expected = 0;
try
{
conn.ConnectionFailed += (sender, e) => Console.WriteLine($"{e.ConnectionType}, {e.FailureType}: {e.Exception.Message}"); conn.ConnectionFailed += (sender, e) => Console.WriteLine($"{e.ConnectionType}, {e.FailureType}: {e.Exception.Message}");
var db = conn.GetDatabase(3); var db = conn.GetDatabase(3);
...@@ -26,17 +31,27 @@ public static async Task Main() ...@@ -26,17 +31,27 @@ public static async Task Main()
var rand = new Random(12345); var rand = new Random(12345);
RedisKey counter = "counter"; RedisKey counter = "counter";
db.KeyDelete(counter, CommandFlags.FireAndForget); db.KeyDelete(counter, CommandFlags.FireAndForget);
int expected = 0;
for (int i = 0; i < 1000; i++) for (int i = 0; i < 1000; i++)
{ {
int x = rand.Next(50); int x = rand.Next(50);
Console.WriteLine($"{i}:{x}"); //Console.WriteLine($"{i}:{x}");
expected += x; expected += x;
db.StringIncrement(counter, x); //, CommandFlags.FireAndForget); db.StringIncrement(counter, x); //, CommandFlags.FireAndForget);
} }
int actual = (int)await db.StringGetAsync(counter); int actual = (int)await db.StringGetAsync(counter);
Console.WriteLine($"{expected} vs {actual}"); Console.WriteLine($"{expected} vs {actual}");
} }
catch (Exception ex)
{
Console.WriteLine($"expected when fail: {expected}");
Console.WriteLine(ex.Message);
}
finally
{
Console.WriteLine("Press any key");
Console.ReadKey();
}
}
} }
} }
} }
...@@ -982,7 +982,9 @@ void ISocketCallback.OnHeartbeat() ...@@ -982,7 +982,9 @@ void ISocketCallback.OnHeartbeat()
{ {
while (true) while (true)
{ {
var input = _ioPipe.Input; var input = _ioPipe?.Input;
if (input == null) break;
var readResult = await input.ReadAsync(); var readResult = await input.ReadAsync();
if (readResult.IsCompleted && readResult.Buffer.IsEmpty) if (readResult.IsCompleted && readResult.Buffer.IsEmpty)
{ {
...@@ -990,9 +992,12 @@ void ISocketCallback.OnHeartbeat() ...@@ -990,9 +992,12 @@ void ISocketCallback.OnHeartbeat()
} }
var buffer = readResult.Buffer; var buffer = readResult.Buffer;
int handled = ProcessBuffer(in buffer, out var consumed); var s = new RawResult(ResultType.BulkString, buffer, false).GetString().Replace("\r","\\r").Replace("\n","\\n");
int handled = ProcessBuffer(ref buffer); // updates buffer.Start
Multiplexer.Trace($"Processed {handled} messages", physicalName); Multiplexer.Trace($"Processed {handled} messages", physicalName);
input.AdvanceTo(buffer.GetPosition(consumed), buffer.End); input.AdvanceTo(buffer.Start, buffer.End);
} }
Multiplexer.Trace("EOF", physicalName); Multiplexer.Trace("EOF", physicalName);
RecordConnectionFailed(ConnectionFailureType.SocketClosed); RecordConnectionFailed(ConnectionFailureType.SocketClosed);
...@@ -1003,27 +1008,28 @@ void ISocketCallback.OnHeartbeat() ...@@ -1003,27 +1008,28 @@ void ISocketCallback.OnHeartbeat()
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex); RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
} }
} }
private int ProcessBuffer(in ReadOnlySequence<byte> entireBuffer, out long consumed)
private int ProcessBuffer(ref ReadOnlySequence<byte> buffer)
{ {
int messageCount = 0; int messageCount = 0;
var remainingBuffer = entireBuffer; // create a snapshot so we can trim it after each decoded message
// (so that slicing later doesn't require us to keep skipping segments)
consumed = 0; while (!buffer.IsEmpty)
while (!remainingBuffer.IsEmpty)
{ {
var reader = new BufferReader(remainingBuffer); var reader = new BufferReader(buffer);
var result = TryParseResult(in remainingBuffer, ref reader); var result = TryParseResult(in buffer, ref reader);
if (result.HasValue) if (result.HasValue)
{ {
consumed += reader.TotalConsumed; buffer = buffer.Slice(reader.TotalConsumed);
remainingBuffer = remainingBuffer.Slice(reader.TotalConsumed);
messageCount++; messageCount++;
Multiplexer.Trace(result.ToString(), physicalName); Multiplexer.Trace(result.ToString(), physicalName);
MatchResult(result); MatchResult(result);
} }
else
{
break; // remaining buffer isn't enough; give up
}
} }
return messageCount; return messageCount;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment