Commit f343388c authored by Marc Gravell's avatar Marc Gravell

moved all socket-server code to Pipelines.Sockets.Unofficial

parent aaa6afa0
...@@ -19,8 +19,20 @@ public void ConfigureServices(IServiceCollection services) ...@@ -19,8 +19,20 @@ public void ConfigureServices(IServiceCollection services)
public void Dispose() => _server.Dispose(); public void Dispose() => _server.Dispose();
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env) public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
{ {
_server.Shutdown.ContinueWith((t, s) =>
{
try
{ // if the resp server is shutdown by a client: stop the kestrel server too
if (t.Result == RespServer.ShutdownReason.ClientInitiated)
{
((IApplicationLifetime)s).StopApplication();
}
}
catch { }
}, lifetime);
if (env.IsDevelopment()) app.UseDeveloperExceptionPage(); if (env.IsDevelopment()) app.UseDeveloperExceptionPage();
app.Run(context => context.Response.WriteAsync(_server.GetStats())); app.Run(context => context.Response.WriteAsync(_server.GetStats()));
} }
......
...@@ -277,7 +277,7 @@ protected virtual RedisResult Set(RedisClient client, RedisRequest request) ...@@ -277,7 +277,7 @@ protected virtual RedisResult Set(RedisClient client, RedisRequest request)
[RedisCommand(1)] [RedisCommand(1)]
protected new virtual RedisResult Shutdown(RedisClient client, RedisRequest request) protected new virtual RedisResult Shutdown(RedisClient client, RedisRequest request)
{ {
DoShutdown(); DoShutdown(ShutdownReason.ClientInitiated);
return RedisResult.OK; return RedisResult.OK;
} }
[RedisCommand(2)] [RedisCommand(2)]
...@@ -378,8 +378,8 @@ StringBuilder AddHeader() ...@@ -378,8 +378,8 @@ StringBuilder AddHeader()
{ {
sb.Append("process:").Append(process.Id).AppendLine(); sb.Append("process:").Append(process.Id).AppendLine();
} }
var port = TcpPort(); //var port = TcpPort();
if (port >= 0) sb.Append("tcp_port:").Append(port).AppendLine(); //if (port >= 0) sb.Append("tcp_port:").Append(port).AppendLine();
break; break;
case "Clients": case "Clients":
AddHeader().Append("connected_clients:").Append(ClientCount).AppendLine(); AddHeader().Append("connected_clients:").Append(ClientCount).AppendLine();
......
...@@ -5,8 +5,6 @@ ...@@ -5,8 +5,6 @@
using System.IO; using System.IO;
using System.IO.Pipelines; using System.IO.Pipelines;
using System.Linq; using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Reflection; using System.Reflection;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
...@@ -17,9 +15,14 @@ namespace StackExchange.Redis.Server ...@@ -17,9 +15,14 @@ namespace StackExchange.Redis.Server
{ {
public abstract partial class RespServer : IDisposable public abstract partial class RespServer : IDisposable
{ {
public enum ShutdownReason
{
ServerDisposed,
ClientInitiated,
}
private readonly List<RedisClient> _clients = new List<RedisClient>(); private readonly List<RedisClient> _clients = new List<RedisClient>();
private readonly TextWriter _output; private readonly TextWriter _output;
private Socket _listener;
public RespServer(TextWriter output = null) public RespServer(TextWriter output = null)
{ {
_output = output; _output = output;
...@@ -176,43 +179,7 @@ internal int NetArity() ...@@ -176,43 +179,7 @@ internal int NetArity()
} }
} }
delegate RedisResult RespOperation(RedisClient client, RedisRequest request); delegate RedisResult RespOperation(RedisClient client, RedisRequest request);
protected int TcpPort()
{
var ep = _listener?.LocalEndPoint;
if (ep is IPEndPoint ip) return ip.Port;
if (ep is DnsEndPoint dns) return dns.Port;
return -1;
}
private Action<object> _runClientCallback;
// KeepAlive here just to make the compiler happy that we've done *something* with the task
private Action<object> RunClientCallback => _runClientCallback ??
(_runClientCallback = state => GC.KeepAlive(RunClientAsync((IDuplexPipe)state)));
public void Listen(
EndPoint endpoint,
AddressFamily addressFamily = AddressFamily.InterNetwork,
SocketType socketType = SocketType.Stream,
ProtocolType protocolType = ProtocolType.Tcp,
PipeOptions sendOptions = null, PipeOptions receiveOptions = null)
{
Socket listener = new Socket(addressFamily, socketType, protocolType);
listener.Bind(endpoint);
listener.Listen(20);
_listener = listener;
StartOnScheduler(receiveOptions?.ReaderScheduler, _ => ListenForConnections(
sendOptions ?? PipeOptions.Default, receiveOptions ?? PipeOptions.Default), null);
Log("Server is listening on " + Format.ToString(endpoint));
}
private static void StartOnScheduler(PipeScheduler scheduler, Action<object> callback, object state)
{
if (scheduler == PipeScheduler.Inline) scheduler = null;
(scheduler ?? PipeScheduler.ThreadPool).Schedule(callback, state);
}
// for extensibility, so that a subclass can get their own client type // for extensibility, so that a subclass can get their own client type
// to be used via ListenForConnections // to be used via ListenForConnections
public virtual RedisClient CreateClient() => new RedisClient(); public virtual RedisClient CreateClient() => new RedisClient();
...@@ -244,33 +211,14 @@ public bool RemoveClient(RedisClient client) ...@@ -244,33 +211,14 @@ public bool RemoveClient(RedisClient client)
return _clients.Remove(client); return _clients.Remove(client);
} }
} }
private async void ListenForConnections(PipeOptions sendOptions, PipeOptions receiveOptions)
{
try
{
while (true)
{
var client = await _listener.AcceptAsync();
SocketConnection.SetRecommendedServerOptions(client);
var pipe = SocketConnection.Create(client, sendOptions, receiveOptions);
StartOnScheduler(receiveOptions.ReaderScheduler, RunClientCallback, pipe);
}
}
catch (NullReferenceException) { }
catch (ObjectDisposedException) { }
catch (Exception ex)
{
if (!_isShutdown) Log("Listener faulted: " + ex.Message);
}
}
private readonly TaskCompletionSource<int> _shutdown = new TaskCompletionSource<int>(); private readonly TaskCompletionSource<ShutdownReason> _shutdown = new TaskCompletionSource<ShutdownReason>(TaskCreationOptions.RunContinuationsAsynchronously);
private bool _isShutdown; private bool _isShutdown;
protected void ThrowIfShutdown() protected void ThrowIfShutdown()
{ {
if (_isShutdown) throw new InvalidOperationException("The server is shutting down"); if (_isShutdown) throw new InvalidOperationException("The server is shutting down");
} }
protected void DoShutdown(PipeScheduler scheduler = null) protected void DoShutdown(ShutdownReason reason, PipeScheduler scheduler = null)
{ {
if (_isShutdown) return; if (_isShutdown) return;
Log("Server shutting down..."); Log("Server shutting down...");
...@@ -280,19 +228,13 @@ protected void DoShutdown(PipeScheduler scheduler = null) ...@@ -280,19 +228,13 @@ protected void DoShutdown(PipeScheduler scheduler = null)
foreach (var client in _clients) client.Dispose(); foreach (var client in _clients) client.Dispose();
_clients.Clear(); _clients.Clear();
} }
StartOnScheduler(scheduler, _shutdown.TrySetResult(reason);
state => ((TaskCompletionSource<int>)state).TrySetResult(0), _shutdown);
} }
public Task Shutdown => _shutdown.Task; public Task<ShutdownReason> Shutdown => _shutdown.Task;
public void Dispose() => Dispose(true); public void Dispose() => Dispose(true);
protected virtual void Dispose(bool disposing) protected virtual void Dispose(bool disposing)
{ {
DoShutdown(); DoShutdown(ShutdownReason.ServerDisposed);
var socket = _listener;
if (socket != null)
{
try { socket.Dispose(); } catch { }
}
} }
public async Task RunClientAsync(IDuplexPipe pipe) public async Task RunClientAsync(IDuplexPipe pipe)
...@@ -335,7 +277,7 @@ public async Task RunClientAsync(IDuplexPipe pipe) ...@@ -335,7 +277,7 @@ public async Task RunClientAsync(IDuplexPipe pipe)
} }
} }
} }
private void Log(string message) public void Log(string message)
{ {
var output = _output; var output = _output;
if (output != null) if (output != null)
......
using System;
using System.Net;
using System.Threading.Tasks;
using Pipelines.Sockets.Unofficial;
namespace StackExchange.Redis.Server
{
public sealed class RespSocketServer : SocketServer
{
private readonly RespServer _server;
public RespSocketServer(RespServer server)
{
_server = server ?? throw new ArgumentNullException(nameof(server));
server.Shutdown.ContinueWith((t, o) => ((SocketServer)o).Dispose(), this);
}
protected override void OnStarted(EndPoint endPoint)
=> _server.Log("Server is listening on " + endPoint);
protected override Task OnClientConnectedAsync(in ClientConnection client)
=> _server.RunClientAsync(client.Transport);
protected override void Dispose(bool disposing)
{
if (disposing) _server.Dispose();
}
}
}
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.68" /> <PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.72" />
<PackageReference Include="System.Threading.Channels" Version="4.5.0" /> <PackageReference Include="System.Threading.Channels" Version="4.5.0" />
</ItemGroup> </ItemGroup>
</Project> </Project>
\ No newline at end of file
...@@ -35,12 +35,12 @@ public static RedisResult Create(RedisResult[] values) ...@@ -35,12 +35,12 @@ public static RedisResult Create(RedisResult[] values)
/// <summary> /// <summary>
/// An empty array result /// An empty array result
/// </summary> /// </summary>
public static RedisResult EmptyArray { get; } = new ArrayRedisResult(Array.Empty<RedisResult>()); internal static RedisResult EmptyArray { get; } = new ArrayRedisResult(Array.Empty<RedisResult>());
/// <summary> /// <summary>
/// A null array result /// A null array result
/// </summary> /// </summary>
public static RedisResult NullArray { get; } = new ArrayRedisResult(null); internal static RedisResult NullArray { get; } = new ArrayRedisResult(null);
// internally, this is very similar to RawResult, except it is designed to be usable // internally, this is very similar to RawResult, except it is designed to be usable
// outside of the IO-processing pipeline: the buffers are standalone, etc // outside of the IO-processing pipeline: the buffers are standalone, etc
...@@ -96,16 +96,16 @@ internal static RedisResult TryCreate(PhysicalConnection connection, RawResult r ...@@ -96,16 +96,16 @@ internal static RedisResult TryCreate(PhysicalConnection connection, RawResult r
/// <summary> /// <summary>
/// An integer-zero result /// An integer-zero result
/// </summary> /// </summary>
public static RedisResult Zero { get; } = Create(0, ResultType.Integer); internal static RedisResult Zero { get; } = Create(0, ResultType.Integer);
/// <summary> /// <summary>
/// An integer-one result /// An integer-one result
/// </summary> /// </summary>
public static RedisResult One { get; } = Create(1, ResultType.Integer); internal static RedisResult One { get; } = Create(1, ResultType.Integer);
/// <summary> /// <summary>
/// A null bulk-string result /// A null bulk-string result
/// </summary> /// </summary>
public static RedisResult Null { get; } = Create(RedisValue.Null, ResultType.BulkString); internal static RedisResult Null { get; } = Create(RedisValue.Null, ResultType.BulkString);
/// <summary> /// <summary>
/// Interprets the result as a <see cref="string"/>. /// Interprets the result as a <see cref="string"/>.
......
...@@ -7,10 +7,11 @@ static class Program ...@@ -7,10 +7,11 @@ static class Program
{ {
static async Task Main() static async Task Main()
{ {
using (var server = new MemoryCacheRedisServer(Console.Out)) using (var resp = new MemoryCacheRedisServer(Console.Out))
using (var socket = new RespSocketServer(resp))
{ {
server.Listen(new IPEndPoint(IPAddress.Loopback, 6378)); socket.Listen(new IPEndPoint(IPAddress.Loopback, 6378));
await server.Shutdown; await resp.Shutdown;
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment