Commit 92a4a245 authored by Marc Gravell's avatar Marc Gravell

Remove StreamExtensions - prefer direct ReadAsync etc; provide direct dnx451 comparison

parent af605dd6
...@@ -26,13 +26,31 @@ internal sealed partial class PhysicalConnection : IDisposable, ISocketCallback ...@@ -26,13 +26,31 @@ internal sealed partial class PhysicalConnection : IDisposable, ISocketCallback
private static readonly byte[] Crlf = Encoding.ASCII.GetBytes("\r\n"); private static readonly byte[] Crlf = Encoding.ASCII.GetBytes("\r\n");
#if CORE_CLR
readonly Action<Task<int>> endRead;
private static Action<Task<int>> EndReadFactory(PhysicalConnection physical)
{
return result =>
{ // can't capture AsyncState on SocketRead, so we'll do it once per physical instead
try
{
physical.multiplexer.Trace("Completed asynchronously: processing in callback", physical.physicalName);
if (physical.EndReading(result)) physical.BeginReading();
}
catch (Exception ex)
{
physical.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
}
};
}
#else
static readonly AsyncCallback endRead = result => static readonly AsyncCallback endRead = result =>
{ {
PhysicalConnection physical; PhysicalConnection physical;
if (result.CompletedSynchronously || (physical = result.AsyncState as PhysicalConnection) == null) return; if (result.CompletedSynchronously || (physical = result.AsyncState as PhysicalConnection) == null) return;
try try
{ {
physical.multiplexer.Trace("Completed synchronously: processing in callback", physical.physicalName); physical.multiplexer.Trace("Completed asynchronously: processing in callback", physical.physicalName);
if (physical.EndReading(result)) physical.BeginReading(); if (physical.EndReading(result)) physical.BeginReading();
} }
catch (Exception ex) catch (Exception ex)
...@@ -40,6 +58,7 @@ internal sealed partial class PhysicalConnection : IDisposable, ISocketCallback ...@@ -40,6 +58,7 @@ internal sealed partial class PhysicalConnection : IDisposable, ISocketCallback
physical.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex); physical.RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
} }
}; };
#endif
private static readonly byte[] message = Encoding.UTF8.GetBytes("message"), pmessage = Encoding.UTF8.GetBytes("pmessage"); private static readonly byte[] message = Encoding.UTF8.GetBytes("message"), pmessage = Encoding.UTF8.GetBytes("pmessage");
...@@ -91,6 +110,9 @@ public PhysicalConnection(PhysicalBridge bridge) ...@@ -91,6 +110,9 @@ public PhysicalConnection(PhysicalBridge bridge)
var endpoint = bridge.ServerEndPoint.EndPoint; var endpoint = bridge.ServerEndPoint.EndPoint;
physicalName = connectionType + "#" + Interlocked.Increment(ref totalCount) + "@" + Format.ToString(endpoint); physicalName = connectionType + "#" + Interlocked.Increment(ref totalCount) + "@" + Format.ToString(endpoint);
this.bridge = bridge; this.bridge = bridge;
#if CORE_CLR
endRead = EndReadFactory(this);
#endif
OnCreateEcho(); OnCreateEcho();
} }
...@@ -279,6 +301,7 @@ internal static void IdentifyFailureType(Exception exception, ref ConnectionFail ...@@ -279,6 +301,7 @@ internal static void IdentifyFailureType(Exception exception, ref ConnectionFail
{ {
if (exception != null && failureType == ConnectionFailureType.InternalFailure) if (exception != null && failureType == ConnectionFailureType.InternalFailure)
{ {
if (exception is AggregateException) exception = exception.InnerException ?? exception;
if (exception is AuthenticationException) failureType = ConnectionFailureType.AuthenticationFailure; if (exception is AuthenticationException) failureType = ConnectionFailureType.AuthenticationFailure;
else if (exception is SocketException || exception is IOException) failureType = ConnectionFailureType.SocketFailure; else if (exception is SocketException || exception is IOException) failureType = ConnectionFailureType.SocketFailure;
else if (exception is EndOfStreamException) failureType = ConnectionFailureType.SocketClosed; else if (exception is EndOfStreamException) failureType = ConnectionFailureType.SocketClosed;
...@@ -677,20 +700,27 @@ void BeginReading() ...@@ -677,20 +700,27 @@ void BeginReading()
keepReading = false; keepReading = false;
int space = EnsureSpaceAndComputeBytesToRead(); int space = EnsureSpaceAndComputeBytesToRead();
multiplexer.Trace("Beginning async read...", physicalName); multiplexer.Trace("Beginning async read...", physicalName);
var result = netStream.BeginRead(ioBuffer, ioBufferBytes, space, endRead, this);
#if CORE_CLR #if CORE_CLR
Task<int> t = (Task<int>)result; var result = netStream.ReadAsync(ioBuffer, ioBufferBytes, space);
if (t.Status == TaskStatus.RanToCompletion && t.Result == -1) switch(result.Status)
{ {
multiplexer.Trace("Could not connect: ", physicalName); case TaskStatus.RanToCompletion:
return; case TaskStatus.Faulted:
multiplexer.Trace("Completed synchronously: processing immediately", physicalName);
keepReading = EndReading(result);
break;
default:
result.ContinueWith(endRead);
break;
} }
#endif #else
var result = netStream.BeginRead(ioBuffer, ioBufferBytes, space, endRead, this);
if (result.CompletedSynchronously) if (result.CompletedSynchronously)
{ {
multiplexer.Trace("Completed synchronously: processing immediately", physicalName); multiplexer.Trace("Completed synchronously: processing immediately", physicalName);
keepReading = EndReading(result); keepReading = EndReading(result);
} }
#endif
} while (keepReading); } while (keepReading);
} }
#if CORE_CLR #if CORE_CLR
...@@ -794,6 +824,22 @@ SocketMode ISocketCallback.Connected(Stream stream, TextWriter log) ...@@ -794,6 +824,22 @@ SocketMode ISocketCallback.Connected(Stream stream, TextWriter log)
} }
} }
#if CORE_CLR
private bool EndReading(Task<int> result)
{
try
{
var tmp = netStream;
int bytesRead = tmp == null ? 0 : result.Result; // note we expect this to be completed
return ProcessReadBytes(bytesRead);
}
catch (Exception ex)
{
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
return false;
}
}
#else
private bool EndReading(IAsyncResult result) private bool EndReading(IAsyncResult result)
{ {
try try
...@@ -808,7 +854,7 @@ private bool EndReading(IAsyncResult result) ...@@ -808,7 +854,7 @@ private bool EndReading(IAsyncResult result)
return false; return false;
} }
} }
#endif
int EnsureSpaceAndComputeBytesToRead() int EnsureSpaceAndComputeBytesToRead()
{ {
int space = ioBuffer.Length - ioBufferBytes; int space = ioBuffer.Length - ioBufferBytes;
...@@ -1113,37 +1159,5 @@ public void CheckForStaleConnection(ref SocketManager.ManagerState managerState) ...@@ -1113,37 +1159,5 @@ public void CheckForStaleConnection(ref SocketManager.ManagerState managerState)
} }
} }
#if CORE_CLR
internal static class StreamExtensions
{
internal static IAsyncResult BeginRead(this Stream stream, byte[] buffer, int offset, int count, AsyncCallback ac, object state)
{
Task<int> f = Task<int>.Factory.StartNew(_ => {
try
{
return stream.Read(buffer, offset, count);
}
catch (IOException ex)
{
System.Diagnostics.Trace.WriteLine("Could not connect: " + ex.InnerException.Message);
return -1;
}
}, state);
if (ac != null) f.ContinueWith(res => ac(f));
return f;
}
internal static int EndRead(this Stream stream, IAsyncResult ar)
{
try
{
return ((Task<int>)ar).Result;
}
catch (AggregateException ex)
{
throw ex.InnerException;
}
}
}
#endif
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment