Commit f80be103 authored by Marc Gravell's avatar Marc Gravell

attempt to fix async cx flow

parent adbc96c0
...@@ -151,8 +151,11 @@ private void AssertOnMessage(Delegate handler) ...@@ -151,8 +151,11 @@ private void AssertOnMessage(Delegate handler)
public void OnMessage(Action<ChannelMessage> handler) public void OnMessage(Action<ChannelMessage> handler)
{ {
AssertOnMessage(handler); AssertOnMessage(handler);
ThreadPool.QueueUserWorkItem( using (ExecutionContext.SuppressFlow())
{
ThreadPool.QueueUserWorkItem(
state => ((ChannelMessageQueue)state).OnMessageSyncImpl(), this); state => ((ChannelMessageQueue)state).OnMessageSyncImpl(), this);
}
} }
private async void OnMessageSyncImpl() private async void OnMessageSyncImpl()
...@@ -181,8 +184,11 @@ private async void OnMessageSyncImpl() ...@@ -181,8 +184,11 @@ private async void OnMessageSyncImpl()
public void OnMessage(Func<ChannelMessage, Task> handler) public void OnMessage(Func<ChannelMessage, Task> handler)
{ {
AssertOnMessage(handler); AssertOnMessage(handler);
ThreadPool.QueueUserWorkItem( using (ExecutionContext.SuppressFlow())
{
ThreadPool.QueueUserWorkItem(
state => ((ChannelMessageQueue)state).OnMessageAsyncImpl(), this); state => ((ChannelMessageQueue)state).OnMessageAsyncImpl(), this);
}
} }
private async void OnMessageAsyncImpl() private async void OnMessageAsyncImpl()
......
...@@ -668,7 +668,11 @@ private PhysicalConnection GetConnection(TextWriter log) ...@@ -668,7 +668,11 @@ private PhysicalConnection GetConnection(TextWriter log)
// separate creation and connection for case when connection completes synchronously // separate creation and connection for case when connection completes synchronously
// in that case PhysicalConnection will call back to PhysicalBridge, and most of PhysicalBridge methods assumes that physical is not null; // in that case PhysicalConnection will call back to PhysicalBridge, and most of PhysicalBridge methods assumes that physical is not null;
physical = new PhysicalConnection(this); physical = new PhysicalConnection(this);
physical.BeginConnectAsync(log);
using (ExecutionContext.SuppressFlow())
{
physical.BeginConnectAsync(log);
}
} }
} }
return null; return null;
......
...@@ -77,7 +77,7 @@ internal async void BeginConnectAsync(TextWriter log) ...@@ -77,7 +77,7 @@ internal async void BeginConnectAsync(TextWriter log)
Thread.VolatileWrite(ref firstUnansweredWriteTickCount, 0); Thread.VolatileWrite(ref firstUnansweredWriteTickCount, 0);
var bridge = BridgeCouldBeNull; var bridge = BridgeCouldBeNull;
var endpoint = bridge?.ServerEndPoint?.EndPoint; var endpoint = bridge?.ServerEndPoint?.EndPoint;
if(endpoint == null) if (endpoint == null)
{ {
log?.WriteLine("No endpoint"); log?.WriteLine("No endpoint");
} }
...@@ -598,7 +598,7 @@ internal void OnBridgeHeartbeat() ...@@ -598,7 +598,7 @@ internal void OnBridgeHeartbeat()
internal void OnInternalError(Exception exception, [CallerMemberName] string origin = null) internal void OnInternalError(Exception exception, [CallerMemberName] string origin = null)
{ {
var bridge = BridgeCouldBeNull; var bridge = BridgeCouldBeNull;
if(bridge != null) if (bridge != null)
{ {
bridge.Multiplexer.OnInternalError(exception, bridge.ServerEndPoint.EndPoint, connectionType, origin); bridge.Multiplexer.OnInternalError(exception, bridge.ServerEndPoint.EndPoint, connectionType, origin);
} }
...@@ -983,7 +983,7 @@ internal static void WriteUnifiedPrefixedString(PipeWriter writer, byte[] prefix ...@@ -983,7 +983,7 @@ internal static void WriteUnifiedPrefixedString(PipeWriter writer, byte[] prefix
internal static Encoder GetPerThreadEncoder() internal static Encoder GetPerThreadEncoder()
{ {
var encoder = s_PerThreadEncoder; var encoder = s_PerThreadEncoder;
if(encoder == null) if (encoder == null)
{ {
s_PerThreadEncoder = encoder = Encoding.UTF8.GetEncoder(); s_PerThreadEncoder = encoder = Encoding.UTF8.GetEncoder();
} }
...@@ -1171,7 +1171,7 @@ internal async ValueTask<bool> ConnectedAsync(Socket socket, TextWriter log, Soc ...@@ -1171,7 +1171,7 @@ internal async ValueTask<bool> ConnectedAsync(Socket socket, TextWriter log, Soc
{ {
ssl.AuthenticateAsClient(host, config.SslProtocols); ssl.AuthenticateAsClient(host, config.SslProtocols);
} }
catch(Exception ex) catch (Exception ex)
{ {
Debug.WriteLine(ex.Message); Debug.WriteLine(ex.Message);
bridge.Multiplexer?.SetAuthSuspect(); bridge.Multiplexer?.SetAuthSuspect();
...@@ -1494,7 +1494,13 @@ private static RawResult ReadLineTerminatedString(ResultType type, ref BufferRea ...@@ -1494,7 +1494,13 @@ private static RawResult ReadLineTerminatedString(ResultType type, ref BufferRea
return new RawResult(type, payload, false); return new RawResult(type, payload, false);
} }
internal void StartReading() => ReadFromPipe(); internal void StartReading()
{
using (ExecutionContext.SuppressFlow())
{
ReadFromPipe();
}
}
internal static RawResult TryParseResult(in ReadOnlySequence<byte> buffer, ref BufferReader reader, internal static RawResult TryParseResult(in ReadOnlySequence<byte> buffer, ref BufferReader reader,
bool includeDetilInExceptions, ServerEndPoint server, bool allowInlineProtocol = false) bool includeDetilInExceptions, ServerEndPoint server, bool allowInlineProtocol = false)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment