Commit 062248b6 authored by Marc Gravell's avatar Marc Gravell

If the incoming socket isn't vanilla, we need to use async-read rather than...

If the incoming socket isn't vanilla, we need to use async-read rather than socket-poll (the value of .Available is meaningless)
parent a92d7ff3
...@@ -66,10 +66,10 @@ public void ConnectToSSLServer(int port, string sslHost) ...@@ -66,10 +66,10 @@ public void ConnectToSSLServer(int port, string sslHost)
// perf: sync/multi-threaded // perf: sync/multi-threaded
TestConcurrent(db, key, 30, 10); TestConcurrent(db, key, 30, 10);
//TestConcurrent(db, key, 30, 20); TestConcurrent(db, key, 30, 20);
//TestConcurrent(db, key, 30, 30); TestConcurrent(db, key, 30, 30);
//TestConcurrent(db, key, 30, 40); TestConcurrent(db, key, 30, 40);
//TestConcurrent(db, key, 30, 50); TestConcurrent(db, key, 30, 50);
} }
} }
......
...@@ -7,7 +7,6 @@ ...@@ -7,7 +7,6 @@
using System.Linq; using System.Linq;
using System.Net; using System.Net;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Runtime.Remoting.Lifetime;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
...@@ -822,8 +821,6 @@ private void OnHeartbeat() ...@@ -822,8 +821,6 @@ private void OnHeartbeat()
long now = Environment.TickCount; long now = Environment.TickCount;
Interlocked.Exchange(ref lastHeartbeatTicks, now); Interlocked.Exchange(ref lastHeartbeatTicks, now);
Interlocked.Exchange(ref lastGlobalHeartbeatTicks, now); Interlocked.Exchange(ref lastGlobalHeartbeatTicks, now);
var lease = pulse == null ? null : pulse.GetLifetimeService() as ILease;
if (lease != null) lease.Renew(TimeSpan.FromMinutes(5));
Trace("heartbeat"); Trace("heartbeat");
var tmp = serverSnapshot; var tmp = serverSnapshot;
...@@ -1238,8 +1235,6 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1238,8 +1235,6 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
{ {
LogLocked(log, "Starting heartbeat..."); LogLocked(log, "Starting heartbeat...");
pulse = new Timer(heartbeat, this, MillisecondsPerHeartbeat, MillisecondsPerHeartbeat); pulse = new Timer(heartbeat, this, MillisecondsPerHeartbeat, MillisecondsPerHeartbeat);
ILease lease = pulse.InitializeLifetimeService() as ILease;
if(lease != null) lease.Renew(TimeSpan.FromMinutes(5));
} }
string stormLog = GetStormLog(); string stormLog = GetStormLog();
......
...@@ -470,10 +470,11 @@ static void WriteUnified(Stream stream, long value) ...@@ -470,10 +470,11 @@ static void WriteUnified(Stream stream, long value)
WriteRaw(stream, value, withLengthPrefix: true); WriteRaw(stream, value, withLengthPrefix: true);
} }
void ISocketCallback.Connected(Stream stream) SocketMode ISocketCallback.Connected(Stream stream)
{ {
try try
{ {
var socketMode = SocketMode.Poll;
// disallow connection in some cases // disallow connection in some cases
OnDebugAbort(); OnDebugAbort();
...@@ -486,6 +487,7 @@ void ISocketCallback.Connected(Stream stream) ...@@ -486,6 +487,7 @@ void ISocketCallback.Connected(Stream stream)
var ssl = new SslStream(stream, false, config.CertificateValidationCallback, config.CertificateSelectionCallback, EncryptionPolicy.RequireEncryption); var ssl = new SslStream(stream, false, config.CertificateValidationCallback, config.CertificateSelectionCallback, EncryptionPolicy.RequireEncryption);
ssl.AuthenticateAsClient(config.SslHost); ssl.AuthenticateAsClient(config.SslHost);
stream = ssl; stream = ssl;
socketMode = SocketMode.Async;
} }
OnWrapForLogging(ref stream, physicalName); OnWrapForLogging(ref stream, physicalName);
...@@ -495,12 +497,13 @@ void ISocketCallback.Connected(Stream stream) ...@@ -495,12 +497,13 @@ void ISocketCallback.Connected(Stream stream)
multiplexer.Trace("Connected", physicalName); multiplexer.Trace("Connected", physicalName);
bridge.OnConnected(this); bridge.OnConnected(this);
return socketMode;
} }
catch (Exception ex) catch (Exception ex)
{ {
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex); // includes a bridge.OnDisconnected RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex); // includes a bridge.OnDisconnected
multiplexer.Trace("Could not connect: " + ex.Message, physicalName); multiplexer.Trace("Could not connect: " + ex.Message, physicalName);
return SocketMode.Abort;
} }
} }
...@@ -630,33 +633,10 @@ void ISocketCallback.Read() ...@@ -630,33 +633,10 @@ void ISocketCallback.Read()
do do
{ {
int space = EnsureSpaceAndComputeBytesToRead(); int space = EnsureSpaceAndComputeBytesToRead();
int bytesRead = netStream.Read(ioBuffer, ioBufferBytes, space); var tmp = netStream;
int bytesRead = tmp == null ? 0 : tmp.Read(ioBuffer, ioBufferBytes, space);
if (bytesRead <= 0) if (!ProcessReadBytes(bytesRead)) return; // EOF
{
multiplexer.Trace("EOF", physicalName);
RecordConnectionFailed(ConnectionFailureType.SocketClosed);
return;
}
Interlocked.Exchange(ref lastReadTickCount, Environment.TickCount);
ioBufferBytes += bytesRead;
multiplexer.Trace("More bytes available: " + bytesRead + " (" + ioBufferBytes + ")", physicalName);
int offset = 0, count = ioBufferBytes;
int handled = ProcessBuffer(ioBuffer, ref offset, ref count);
multiplexer.Trace("Processed: " + handled, physicalName);
if (handled != 0)
{
// read stuff
if (count != 0)
{
multiplexer.Trace("Copying remaining bytes: " + count, physicalName);
// if anything was left over, we need to copy it to
// the start of the buffer so it can be used next time
Buffer.BlockCopy(ioBuffer, offset, ioBuffer, 0, count);
}
ioBufferBytes = count;
}
} while (socketToken.Available != 0); } while (socketToken.Available != 0);
multiplexer.Trace("Buffer exhausted", physicalName); multiplexer.Trace("Buffer exhausted", physicalName);
// ^^^ note that the socket manager will call us again when there is something to do // ^^^ note that the socket manager will call us again when there is something to do
...@@ -666,6 +646,79 @@ void ISocketCallback.Read() ...@@ -666,6 +646,79 @@ void ISocketCallback.Read()
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex); RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
} }
} }
private bool ProcessReadBytes(int bytesRead)
{
if (bytesRead <= 0)
{
multiplexer.Trace("EOF", physicalName);
RecordConnectionFailed(ConnectionFailureType.SocketClosed);
return false;
}
Interlocked.Exchange(ref lastReadTickCount, Environment.TickCount);
ioBufferBytes += bytesRead;
multiplexer.Trace("More bytes available: " + bytesRead + " (" + ioBufferBytes + ")", physicalName);
int offset = 0, count = ioBufferBytes;
int handled = ProcessBuffer(ioBuffer, ref offset, ref count);
multiplexer.Trace("Processed: " + handled, physicalName);
if (handled != 0)
{
// read stuff
if (count != 0)
{
multiplexer.Trace("Copying remaining bytes: " + count, physicalName);
// if anything was left over, we need to copy it to
// the start of the buffer so it can be used next time
Buffer.BlockCopy(ioBuffer, offset, ioBuffer, 0, count);
}
ioBufferBytes = count;
}
return true;
}
static readonly AsyncCallback endRead = result =>
{
PhysicalConnection physical;
if (result.CompletedSynchronously || (physical = result.AsyncState as PhysicalConnection) == null) return;
physical.multiplexer.Trace("Completed synchronously: processing in callback", physical.physicalName);
if(physical.EndReading(result)) physical.BeginReading();
};
private bool EndReading(IAsyncResult result)
{
try
{
var tmp = netStream;
int bytesRead = tmp == null ? 0 : tmp.EndRead(result);
return ProcessReadBytes(bytesRead);
} catch(Exception ex)
{
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
return false;
}
}
void ISocketCallback.StartReading()
{
BeginReading();
}
void BeginReading()
{
bool keepReading;
do
{
keepReading = false;
int space = EnsureSpaceAndComputeBytesToRead();
multiplexer.Trace("Beginning async read...", physicalName);
var result = netStream.BeginRead(ioBuffer, ioBufferBytes, space, endRead, this);
if (result.CompletedSynchronously)
{
multiplexer.Trace("Completed synchronously: processing immediately", physicalName);
keepReading = EndReading(result);
}
} while (keepReading);
}
private RawResult ReadArray(byte[] buffer, ref int offset, ref int count) private RawResult ReadArray(byte[] buffer, ref int offset, ref int count)
{ {
var itemCount = ReadLineTerminatedString(ResultType.Integer, buffer, ref offset, ref count); var itemCount = ReadLineTerminatedString(ResultType.Integer, buffer, ref offset, ref count);
......
...@@ -155,7 +155,7 @@ private void ReadImpl() ...@@ -155,7 +155,7 @@ private void ReadImpl()
foreach(var pair in allSocketPairs) foreach(var pair in allSocketPairs)
{ {
var callback = pair.Callback; var callback = pair.Callback;
if (callback != null) callback.OnHeartbeat(); if (callback != null) try { callback.OnHeartbeat(); } catch { }
} }
} }
...@@ -275,9 +275,8 @@ public TimeValue(int microSeconds) ...@@ -275,9 +275,8 @@ public TimeValue(int microSeconds)
} }
} }
internal void Shutdown(SocketToken token) private void Shutdown(Socket socket)
{ {
var socket = token.Socket;
if (socket != null) if (socket != null)
{ {
lock (socketLookup) lock (socketLookup)
...@@ -289,6 +288,10 @@ internal void Shutdown(SocketToken token) ...@@ -289,6 +288,10 @@ internal void Shutdown(SocketToken token)
try { socket.Dispose(); } catch { } try { socket.Dispose(); } catch { }
} }
} }
internal void Shutdown(SocketToken token)
{
Shutdown(token.Socket);
}
private readonly object QueueDrainSyncLock = new object(); private readonly object QueueDrainSyncLock = new object();
static readonly WaitCallback HelpProcessItems = state => static readonly WaitCallback HelpProcessItems = state =>
...@@ -483,8 +486,20 @@ private void EndConnect(IAsyncResult ar) ...@@ -483,8 +486,20 @@ private void EndConnect(IAsyncResult ar)
var callback = tuple.Item2; var callback = tuple.Item2;
socket.EndConnect(ar); socket.EndConnect(ar);
var netStream = new NetworkStream(socket, false); var netStream = new NetworkStream(socket, false);
callback.Connected(netStream); var socketMode = callback == null ? SocketMode.Abort : callback.Connected(netStream);
AddRead(socket, callback); switch (socketMode)
{
case SocketMode.Poll:
AddRead(socket, callback);
break;
case SocketMode.Async:
try { callback.StartReading(); }
catch { Shutdown(socket); }
break;
default:
Shutdown(socket);
break;
}
} }
catch catch
{ {
...@@ -507,18 +522,29 @@ internal interface ISocketCallback ...@@ -507,18 +522,29 @@ internal interface ISocketCallback
/// <summary> /// <summary>
/// Indicates that a socket has connected /// Indicates that a socket has connected
/// </summary> /// </summary>
void Connected(Stream stream); SocketMode Connected(Stream stream);
/// <summary> /// <summary>
/// Indicates that data is available on the socket, and that the consumer should read from the socket /// Indicates that data is available on the socket, and that the consumer should read synchronously from the socket while there is data
/// </summary> /// </summary>
void Read(); void Read();
/// <summary> /// <summary>
/// Indicates that we cannot know whether data is available, and that the consume should commence reading asynchronously
/// </summary>
void StartReading();
/// <summary>
/// Indicates that the socket has signalled an error condition /// Indicates that the socket has signalled an error condition
/// </summary> /// </summary>
void Error(); void Error();
void OnHeartbeat(); void OnHeartbeat();
} }
internal enum SocketMode
{
Abort,
Poll,
Async
}
internal struct SocketToken internal struct SocketToken
{ {
internal readonly Socket Socket; internal readonly Socket Socket;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment