Commit 71418c9f authored by Nick Craver's avatar Nick Craver

Remove ISocketCallback interface

This simplifies internal logistics and our profiling a bit
parent 18d14db3
...@@ -401,7 +401,7 @@ internal void OnHeartbeat(bool ifConnectedOnly) ...@@ -401,7 +401,7 @@ internal void OnHeartbeat(bool ifConnectedOnly)
Interlocked.Exchange(ref connectTimeoutRetryCount, 0); Interlocked.Exchange(ref connectTimeoutRetryCount, 0);
tmp.Bridge.ServerEndPoint.ClearUnselectable(UnselectableFlags.DidNotRespond); tmp.Bridge.ServerEndPoint.ClearUnselectable(UnselectableFlags.DidNotRespond);
} }
tmp.OnHeartbeat(); tmp.OnBridgeHeartbeat();
int writeEverySeconds = ServerEndPoint.WriteEverySeconds, int writeEverySeconds = ServerEndPoint.WriteEverySeconds,
checkConfigSeconds = Multiplexer.RawConfig.ConfigCheckSeconds; checkConfigSeconds = Multiplexer.RawConfig.ConfigCheckSeconds;
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
namespace StackExchange.Redis namespace StackExchange.Redis
{ {
internal sealed partial class PhysicalConnection : IDisposable, ISocketCallback internal sealed partial class PhysicalConnection : IDisposable
{ {
internal readonly byte[] ChannelPrefix; internal readonly byte[] ChannelPrefix;
...@@ -390,7 +390,7 @@ internal void GetStormLog(StringBuilder sb) ...@@ -390,7 +390,7 @@ internal void GetStormLog(StringBuilder sb)
} }
} }
internal void OnHeartbeat() internal void OnBridgeHeartbeat()
{ {
Interlocked.Exchange(ref lastBeatTickCount, Environment.TickCount); Interlocked.Exchange(ref lastBeatTickCount, Environment.TickCount);
} }
...@@ -871,7 +871,7 @@ private static LocalCertificateSelectionCallback GetAmbientCertificateCallback() ...@@ -871,7 +871,7 @@ private static LocalCertificateSelectionCallback GetAmbientCertificateCallback()
return null; return null;
} }
async ValueTask<SocketMode> ISocketCallback.ConnectedAsync(Socket socket, TextWriter log, SocketManager manager) internal async ValueTask<SocketMode> ConnectedAsync(Socket socket, TextWriter log, SocketManager manager)
{ {
try try
{ {
...@@ -937,7 +937,7 @@ async ValueTask<SocketMode> ISocketCallback.ConnectedAsync(Socket socket, TextWr ...@@ -937,7 +937,7 @@ async ValueTask<SocketMode> ISocketCallback.ConnectedAsync(Socket socket, TextWr
} }
} }
void ISocketCallback.Error() internal void Error()
{ {
RecordConnectionFailed(ConnectionFailureType.SocketFailure); RecordConnectionFailed(ConnectionFailureType.SocketFailure);
} }
...@@ -1016,7 +1016,7 @@ private void MatchResult(RawResult result) ...@@ -1016,7 +1016,7 @@ private void MatchResult(RawResult result)
partial void OnCreateEcho(); partial void OnCreateEcho();
partial void OnDebugAbort(); partial void OnDebugAbort();
void ISocketCallback.OnHeartbeat() internal void OnHeartbeat()
{ {
try try
{ {
...@@ -1202,7 +1202,7 @@ private RawResult ReadLineTerminatedString(ResultType type, in ReadOnlySequence< ...@@ -1202,7 +1202,7 @@ private RawResult ReadLineTerminatedString(ResultType type, in ReadOnlySequence<
return new RawResult(type, payload, false); return new RawResult(type, payload, false);
} }
void ISocketCallback.StartReading() => ReadFromPipe(); internal void StartReading() => ReadFromPipe();
private RawResult TryParseResult(in ReadOnlySequence<byte> buffer, ref BufferReader reader) private RawResult TryParseResult(in ReadOnlySequence<byte> buffer, ref BufferReader reader)
{ {
......
...@@ -15,40 +15,6 @@ internal enum SocketMode ...@@ -15,40 +15,6 @@ internal enum SocketMode
Async, Async,
} }
/// <summary>
/// Allows callbacks from SocketManager as work is discovered
/// </summary>
internal partial interface ISocketCallback
{
/// <summary>
/// Indicates that a socket has connected
/// </summary>
/// <param name="socket">The socket.</param>
/// <param name="log">A text logger to write to.</param>
/// <param name="manager">The manager that will be owning this socket.</param>
ValueTask<SocketMode> ConnectedAsync(Socket socket, TextWriter log, SocketManager manager);
/// <summary>
/// Indicates that the socket has signalled an error condition
/// </summary>
void Error();
void OnHeartbeat();
///// <summary>
///// Indicates that data is available on the socket, and that the consumer should read synchronously from the socket while there is data
///// </summary>
//void Read();
/// <summary>
/// Indicates that we cannot know whether data is available, and that the consume should commence reading asynchronously
/// </summary>
void StartReading();
// check for write-read timeout
void CheckForStaleConnection(ref SocketManager.ManagerState state);
}
/// <summary> /// <summary>
/// A SocketManager monitors multiple sockets for availability of data; this is done using /// A SocketManager monitors multiple sockets for availability of data; this is done using
/// the Socket.Select API and a dedicated reader-thread, which allows for fast responses /// the Socket.Select API and a dedicated reader-thread, which allows for fast responses
...@@ -197,7 +163,7 @@ private void Dispose(bool disposing) ...@@ -197,7 +163,7 @@ private void Dispose(bool disposing)
/// </summary> /// </summary>
~SocketManager() => Dispose(false); ~SocketManager() => Dispose(false);
internal Socket BeginConnect(EndPoint endpoint, ISocketCallback callback, ConnectionMultiplexer multiplexer, TextWriter log) internal Socket BeginConnect(EndPoint endpoint, PhysicalConnection callback, ConnectionMultiplexer multiplexer, TextWriter log)
{ {
void RunWithCompletionType(Func<AsyncCallback, IAsyncResult> beginAsync, AsyncCallback asyncCallback) void RunWithCompletionType(Func<AsyncCallback, IAsyncResult> beginAsync, AsyncCallback asyncCallback)
{ {
...@@ -225,28 +191,27 @@ void proxyCallback(IAsyncResult ar) ...@@ -225,28 +191,27 @@ void proxyCallback(IAsyncResult ar)
try try
{ {
var formattedEndpoint = Format.ToString(endpoint); var formattedEndpoint = Format.ToString(endpoint);
var tuple = Tuple.Create(socket, callback);
multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint); multiplexer.LogLocked(log, "BeginConnect: {0}", formattedEndpoint);
// A work-around for a Mono bug in BeginConnect(EndPoint endpoint, AsyncCallback callback, object state) // A work-around for a Mono bug in BeginConnect(EndPoint endpoint, AsyncCallback callback, object state)
if (endpoint is DnsEndPoint dnsEndpoint) if (endpoint is DnsEndPoint dnsEndpoint)
{ {
RunWithCompletionType( RunWithCompletionType(
cb => socket.BeginConnect(dnsEndpoint.Host, dnsEndpoint.Port, cb, tuple), cb => socket.BeginConnect(dnsEndpoint.Host, dnsEndpoint.Port, cb, null),
ar => ar =>
{ {
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint); multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(ar, multiplexer, log, tuple); EndConnectImpl(ar, multiplexer, log, socket, callback);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint); multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
}); });
} }
else else
{ {
RunWithCompletionType( RunWithCompletionType(
cb => socket.BeginConnect(endpoint, cb, tuple), cb => socket.BeginConnect(endpoint, cb, null),
ar => ar =>
{ {
multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint); multiplexer.LogLocked(log, "EndConnect: {0}", formattedEndpoint);
EndConnectImpl(ar, multiplexer, log, tuple); EndConnectImpl(ar, multiplexer, log, socket, callback);
multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint); multiplexer.LogLocked(log, "Connect complete: {0}", formattedEndpoint);
}); });
} }
...@@ -261,24 +226,24 @@ void proxyCallback(IAsyncResult ar) ...@@ -261,24 +226,24 @@ void proxyCallback(IAsyncResult ar)
} }
return socket; return socket;
} }
private async void EndConnectImpl(IAsyncResult ar, ConnectionMultiplexer multiplexer, TextWriter log, Tuple<Socket, ISocketCallback> tuple) private async void EndConnectImpl(IAsyncResult ar, ConnectionMultiplexer multiplexer, TextWriter log, Socket socket, PhysicalConnection connection)
{ {
var socket = tuple.Item1;
var callback = tuple.Item2;
try try
{ {
bool ignoreConnect = false; bool ignoreConnect = false;
ShouldIgnoreConnect(tuple.Item2, ref ignoreConnect); ShouldIgnoreConnect(connection, ref ignoreConnect);
if (ignoreConnect) return; if (ignoreConnect) return;
socket.EndConnect(ar); socket.EndConnect(ar);
var socketMode = callback == null ? SocketMode.Abort : await callback.ConnectedAsync(socket, log, this); var socketMode = connection == null ? SocketMode.Abort : await connection.ConnectedAsync(socket, log, this).ForAwait();
switch (socketMode) switch (socketMode)
{ {
case SocketMode.Async: case SocketMode.Async:
multiplexer.LogLocked(log, "Starting read"); multiplexer.LogLocked(log, "Starting read");
try try
{ callback.StartReading(); } {
connection.StartReading();
}
catch (Exception ex) catch (Exception ex)
{ {
ConnectionMultiplexer.TraceWithoutContext(ex.Message); ConnectionMultiplexer.TraceWithoutContext(ex.Message);
...@@ -294,9 +259,9 @@ private async void EndConnectImpl(IAsyncResult ar, ConnectionMultiplexer multipl ...@@ -294,9 +259,9 @@ private async void EndConnectImpl(IAsyncResult ar, ConnectionMultiplexer multipl
catch (ObjectDisposedException) catch (ObjectDisposedException)
{ {
multiplexer.LogLocked(log, "(socket shutdown)"); multiplexer.LogLocked(log, "(socket shutdown)");
if (callback != null) if (connection != null)
{ {
try { callback.Error(); } try { connection.Error(); }
catch (Exception inner) catch (Exception inner)
{ {
ConnectionMultiplexer.TraceWithoutContext(inner.Message); ConnectionMultiplexer.TraceWithoutContext(inner.Message);
...@@ -306,9 +271,9 @@ private async void EndConnectImpl(IAsyncResult ar, ConnectionMultiplexer multipl ...@@ -306,9 +271,9 @@ private async void EndConnectImpl(IAsyncResult ar, ConnectionMultiplexer multipl
catch (Exception outer) catch (Exception outer)
{ {
ConnectionMultiplexer.TraceWithoutContext(outer.Message); ConnectionMultiplexer.TraceWithoutContext(outer.Message);
if (callback != null) if (connection != null)
{ {
try { callback.Error(); } try { connection.Error(); }
catch (Exception inner) catch (Exception inner)
{ {
ConnectionMultiplexer.TraceWithoutContext(inner.Message); ConnectionMultiplexer.TraceWithoutContext(inner.Message);
...@@ -320,7 +285,7 @@ private async void EndConnectImpl(IAsyncResult ar, ConnectionMultiplexer multipl ...@@ -320,7 +285,7 @@ private async void EndConnectImpl(IAsyncResult ar, ConnectionMultiplexer multipl
partial void OnDispose(); partial void OnDispose();
partial void OnShutdown(Socket socket); partial void OnShutdown(Socket socket);
partial void ShouldIgnoreConnect(ISocketCallback callback, ref bool ignore); partial void ShouldIgnoreConnect(PhysicalConnection callback, ref bool ignore);
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")] [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")]
internal void Shutdown(Socket socket) internal void Shutdown(Socket socket)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment