Commit cad9dacd authored by Marc Gravell's avatar Marc Gravell

update socket lib

parent 82a73263
using System; using System;
using System.Diagnostics; using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using StackExchange.Redis; using StackExchange.Redis;
...@@ -9,6 +10,7 @@ internal static class Program ...@@ -9,6 +10,7 @@ internal static class Program
{ {
public static async Task Main() public static async Task Main()
{ {
Thread.CurrentThread.Name = nameof(Main);
using (var conn = await ConnectionMultiplexer.ConnectAsync("127.0.0.1:6379,syncTimeout=2000")) using (var conn = await ConnectionMultiplexer.ConnectAsync("127.0.0.1:6379,syncTimeout=2000"))
{ {
int expected = 0; int expected = 0;
......
...@@ -31,6 +31,6 @@ ...@@ -31,6 +31,6 @@
<PackageReference Include="System.Memory" Version="$(CoreFxVersion)" /> <PackageReference Include="System.Memory" Version="$(CoreFxVersion)" />
<PackageReference Include="System.Buffers" Version="$(CoreFxVersion)" /> <PackageReference Include="System.Buffers" Version="$(CoreFxVersion)" />
<PackageReference Include="System.IO.Pipelines" Version="$(CoreFxVersion)" /> <PackageReference Include="System.IO.Pipelines" Version="$(CoreFxVersion)" />
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.0-alpha-005" /> <PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.0-alpha-007" />
</ItemGroup> </ItemGroup>
</Project> </Project>
\ No newline at end of file
...@@ -975,27 +975,34 @@ void ISocketCallback.OnHeartbeat() ...@@ -975,27 +975,34 @@ void ISocketCallback.OnHeartbeat()
} }
partial void OnWrapForLogging(ref IDuplexPipe pipe, string name); partial void OnWrapForLogging(ref IDuplexPipe pipe, string name);
private async void ReadFromPipe() // yes it is an async void; deal with it! private async void ReadFromPipe() // yes it is an async void; deal with it!
{ {
try try
{ {
bool allowSyncRead = true;
while (true) while (true)
{ {
var input = _ioPipe?.Input; var input = _ioPipe?.Input;
if (input == null) break; if (input == null) break;
var readResult = await input.ReadAsync(); // note: TryRead will give us back the same buffer in a tight loop
// - so: only use that if we're making progress
if(!(allowSyncRead && input.TryRead(out var readResult)))
{
readResult = await input.ReadAsync();
}
if (readResult.IsCompleted && readResult.Buffer.IsEmpty) if (readResult.IsCompleted && readResult.Buffer.IsEmpty)
{ {
break; // we're all done break; // we're all done
} }
var buffer = readResult.Buffer; var buffer = readResult.Buffer;
var s = new RawResult(ResultType.BulkString, buffer, false).GetString().Replace("\r","\\r").Replace("\n","\\n"); var s = new RawResult(ResultType.BulkString, buffer, false).GetString().Replace("\r","\\r").Replace("\n","\\n");
int handled = ProcessBuffer(ref buffer); // updates buffer.Start int handled = ProcessBuffer(ref buffer); // updates buffer.Start
allowSyncRead = handled != 0;
Multiplexer.Trace($"Processed {handled} messages", physicalName); Multiplexer.Trace($"Processed {handled} messages", physicalName);
input.AdvanceTo(buffer.Start, buffer.End); input.AdvanceTo(buffer.Start, buffer.End);
} }
......
...@@ -174,8 +174,12 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, C ...@@ -174,8 +174,12 @@ internal SocketToken BeginConnect(EndPoint endpoint, ISocketCallback callback, C
{ {
var formattedEndpoint = Format.ToString(endpoint); var formattedEndpoint = Format.ToString(endpoint);
SocketConnection.ConnectAsync(endpoint, PipeOptions, var t = SocketConnection.ConnectAsync(endpoint, PipeOptions,
conn => EndConnectAsync(conn, multiplexer, log, callback), socket); //SocketConnectionOptions.SyncReader | SocketConnectionOptions.SyncWriter,
SocketConnectionOptions.None,
onConnected: conn => EndConnectAsync(conn, multiplexer, log, callback),
socket: socket);
GC.KeepAlive(t); // make compiler happier
} }
catch (NotImplementedException ex) catch (NotImplementedException ex)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment