Commit c83fa66a authored by yangxiaodong's avatar yangxiaodong

add Gateway middleware

parent 37e1022e
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Dashboard.GatewayProxy.Request.Middleware;
using DotNetCore.CAP.Dashboard.GatewayProxy.Requester.Middleware;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
......@@ -49,7 +51,13 @@ namespace Microsoft.AspNetCore.Builder
throw new InvalidOperationException("Add Cap must be called on the service collection.");
}
app.Map(new PathString(pathMatch), x => x.UseMiddleware<DashboardMiddleware>());
app.Map(new PathString(pathMatch), x =>
{
x.UseDownstreamRequestInitialiser();
x.UseHttpRequestBuilderMiddleware();
x.UseHttpRequesterMiddleware();
x.UseMiddleware<DashboardMiddleware>();
});
return app;
}
......
......@@ -5,7 +5,8 @@ using System.Text;
namespace DotNetCore.CAP
{
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.NodeDiscovery;
using DotNetCore.CAP.Dashboard.GatewayProxy;
using DotNetCore.CAP.Dashboard.GatewayProxy.Requester;
using Microsoft.Extensions.DependencyInjection;
internal sealed class DashboardOptionsExtension : ICapOptionsExtension
......@@ -23,6 +24,10 @@ namespace DotNetCore.CAP
_options?.Invoke(dashboardOptions);
services.AddSingleton(dashboardOptions);
services.AddSingleton(DashboardRoutes.Routes);
services.AddSingleton<IHttpRequester, HttpClientHttpRequester>();
services.AddSingleton<IHttpClientCache, MemoryHttpClientCache>();
services.AddScoped<IRequestScopedDataRepository, HttpDataRepository>();
}
}
}
......
using System;
using System.Collections.Generic;
using System.Text;
namespace DotNetCore.CAP.Dashboard.GatewayProxy
{
public class DownstreamUrl
{
public DownstreamUrl(string value)
{
Value = value;
}
public string Value { get; private set; }
}
}
using System.Collections.Generic;
using System.Net.Http;
namespace DotNetCore.CAP.Dashboard.GatewayProxy
{
public abstract class GatewayProxyMiddleware
{
private readonly IRequestScopedDataRepository _requestScopedDataRepository;
protected GatewayProxyMiddleware(IRequestScopedDataRepository requestScopedDataRepository)
{
_requestScopedDataRepository = requestScopedDataRepository;
MiddlewareName = this.GetType().Name;
}
public string MiddlewareName { get; }
//public DownstreamRoute DownstreamRoute => _requestScopedDataRepository.Get<DownstreamRoute>("DownstreamRoute");
public HttpRequestMessage Request => _requestScopedDataRepository.Get<HttpRequestMessage>("Request");
public HttpRequestMessage DownstreamRequest => _requestScopedDataRepository.Get<HttpRequestMessage>("DownstreamRequest");
public HttpResponseMessage HttpResponseMessage => _requestScopedDataRepository.Get<HttpResponseMessage>("HttpResponseMessage");
public void SetUpstreamRequestForThisRequest(HttpRequestMessage request)
{
_requestScopedDataRepository.Add("Request", request);
}
public void SetDownstreamRequest(HttpRequestMessage request)
{
_requestScopedDataRepository.Add("DownstreamRequest", request);
}
public void SetHttpResponseMessageThisRequest(HttpResponseMessage responseMessage)
{
_requestScopedDataRepository.Add("HttpResponseMessage", responseMessage);
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace DotNetCore.CAP.Dashboard.GatewayProxy
{
public class HostAndPort
{
public HostAndPort(string downstreamHost, int downstreamPort)
{
DownstreamHost = downstreamHost?.Trim('/');
DownstreamPort = downstreamPort;
}
public string DownstreamHost { get; private set; }
public int DownstreamPort { get; private set; }
}
}
using System;
using Microsoft.AspNetCore.Http;
namespace DotNetCore.CAP.Dashboard.GatewayProxy
{
public class HttpDataRepository : IRequestScopedDataRepository
{
private readonly IHttpContextAccessor _httpContextAccessor;
public HttpDataRepository(IHttpContextAccessor httpContextAccessor)
{
_httpContextAccessor = httpContextAccessor;
}
public void Add<T>(string key, T value)
{
_httpContextAccessor.HttpContext.Items.Add(key, value);
}
public T Get<T>(string key)
{
object obj;
if (_httpContextAccessor.HttpContext.Items.TryGetValue(key, out obj))
{
return (T)obj;
}
throw new Exception($"Unable to find data for key: {key}");
}
}
}
using System.Collections.Generic;
namespace DotNetCore.CAP.Dashboard.GatewayProxy
{
public interface IRequestScopedDataRepository
{
void Add<T>(string key, T value);
T Get<T>(string key);
}
}
\ No newline at end of file
namespace DotNetCore.CAP.Dashboard.GatewayProxy
{
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
public interface IRequestMapper
{
Task<HttpRequestMessage> Map(HttpRequest request);
}
}
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Primitives;
namespace DotNetCore.CAP.Dashboard.GatewayProxy
{
public class RequestMapper : IRequestMapper
{
private readonly string[] _unsupportedHeaders = { "host" };
private const string SchemeDelimiter = "://";
public async Task<HttpRequestMessage> Map(HttpRequest request)
{
try
{
var requestMessage = new HttpRequestMessage()
{
Content = await MapContent(request),
Method = MapMethod(request),
RequestUri = MapUri(request)
};
MapHeaders(request, requestMessage);
return requestMessage;
}
catch (Exception ex)
{
throw new Exception($"Error when parsing incoming request, exception: {ex.Message}");
}
}
private async Task<HttpContent> MapContent(HttpRequest request)
{
if (request.Body == null)
{
return null;
}
var content = new ByteArrayContent(await ToByteArray(request.Body));
content.Headers.TryAddWithoutValidation("Content-Type", new[] {request.ContentType});
return content;
}
private HttpMethod MapMethod(HttpRequest request)
{
return new HttpMethod(request.Method);
}
private Uri MapUri(HttpRequest request)
{
return new Uri(GetEncodedUrl(request));
}
private void MapHeaders(HttpRequest request, HttpRequestMessage requestMessage)
{
foreach (var header in request.Headers)
{
//todo get rid of if..
if (IsSupportedHeader(header))
{
requestMessage.Headers.TryAddWithoutValidation(header.Key, header.Value.ToArray());
}
}
}
private async Task<byte[]> ToByteArray(Stream stream)
{
using (stream)
{
using (var memStream = new MemoryStream())
{
await stream.CopyToAsync(memStream);
return memStream.ToArray();
}
}
}
private bool IsSupportedHeader(KeyValuePair<string, StringValues> header)
{
return !_unsupportedHeaders.Contains(header.Key.ToLower());
}
/// <summary>
/// Combines the given URI components into a string that is properly encoded for use in HTTP headers.
/// Note that unicode in the HostString will be encoded as punycode.
/// </summary>
/// <param name="scheme">http, https, etc.</param>
/// <param name="host">The host portion of the uri normally included in the Host header. This may include the port.</param>
/// <param name="pathBase">The first portion of the request path associated with application root.</param>
/// <param name="path">The portion of the request path that identifies the requested resource.</param>
/// <param name="query">The query, if any.</param>
/// <param name="fragment">The fragment, if any.</param>
/// <returns></returns>
public string BuildAbsolute(
string scheme,
HostString host,
PathString pathBase = new PathString(),
PathString path = new PathString(),
QueryString query = new QueryString(),
FragmentString fragment = new FragmentString())
{
if (scheme == null)
{
throw new ArgumentNullException(nameof(scheme));
}
var combinedPath = (pathBase.HasValue || path.HasValue) ? (pathBase + path).ToString() : "/";
var encodedHost = host.ToString();
var encodedQuery = query.ToString();
var encodedFragment = fragment.ToString();
// PERF: Calculate string length to allocate correct buffer size for StringBuilder.
var length = scheme.Length + SchemeDelimiter.Length + encodedHost.Length
+ combinedPath.Length + encodedQuery.Length + encodedFragment.Length;
return new StringBuilder(length)
.Append(scheme)
.Append(SchemeDelimiter)
.Append(encodedHost)
.Append(combinedPath)
.Append(encodedQuery)
.Append(encodedFragment)
.ToString();
}
/// <summary>
/// Returns the combined components of the request URL in a fully escaped form suitable for use in HTTP headers
/// and other HTTP operations.
/// </summary>
/// <param name="request">The request to assemble the uri pieces from.</param>
/// <returns></returns>
public string GetEncodedUrl(HttpRequest request)
{
return BuildAbsolute(request.Scheme, request.Host, request.PathBase, request.Path, request.QueryString);
}
}
}
namespace DotNetCore.CAP.Dashboard.GatewayProxy
{
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
public class DownstreamRequestInitialiserMiddleware : GatewayProxyMiddleware
{
private readonly RequestDelegate _next;
private readonly ILogger _logger;
private readonly IRequestMapper _requestMapper;
public DownstreamRequestInitialiserMiddleware(RequestDelegate next,
ILoggerFactory loggerFactory,
IRequestScopedDataRepository requestScopedDataRepository,
IRequestMapper requestMapper)
:base(requestScopedDataRepository)
{
_next = next;
_logger = loggerFactory.CreateLogger<DownstreamRequestInitialiserMiddleware>();
_requestMapper = requestMapper;
}
public async Task Invoke(HttpContext context)
{
_logger.LogDebug("started calling request builder middleware");
var downstreamRequest = await _requestMapper.Map(context.Request);
SetDownstreamRequest(downstreamRequest);
_logger.LogDebug("calling next middleware");
await _next.Invoke(context);
_logger.LogDebug("succesfully called next middleware");
}
}
}
\ No newline at end of file
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Dashboard.GatewayProxy
{
public class HttpRequestBuilderMiddleware : GatewayProxyMiddleware
{
private readonly RequestDelegate _next;
private readonly ILogger _logger;
public HttpRequestBuilderMiddleware(RequestDelegate next,
ILoggerFactory loggerFactory,
IRequestScopedDataRepository requestScopedDataRepository )
:base(requestScopedDataRepository)
{
_next = next;
_logger = loggerFactory.CreateLogger<HttpRequestBuilderMiddleware>();
}
public async Task Invoke(HttpContext context)
{
_logger.LogDebug("started calling request builder middleware");
_logger.LogDebug("setting upstream request");
SetUpstreamRequestForThisRequest(DownstreamRequest);
_logger.LogDebug("calling next middleware");
await _next.Invoke(context);
_logger.LogDebug("succesfully called next middleware");
}
}
}
\ No newline at end of file
using Microsoft.AspNetCore.Builder;
namespace DotNetCore.CAP.Dashboard.GatewayProxy.Request.Middleware
{
public static class HttpRequestBuilderMiddlewareExtensions
{
public static IApplicationBuilder UseHttpRequestBuilderMiddleware(this IApplicationBuilder builder)
{
return builder.UseMiddleware<HttpRequestBuilderMiddleware>();
}
public static IApplicationBuilder UseDownstreamRequestInitialiser(this IApplicationBuilder builder)
{
return builder.UseMiddleware<DownstreamRequestInitialiserMiddleware>();
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester
{
internal class HttpClientBuilder : IHttpClientBuilder
{
private readonly Dictionary<int, Func<DelegatingHandler>> _handlers = new Dictionary<int, Func<DelegatingHandler>>();
public IHttpClient Create()
{
var httpclientHandler = new HttpClientHandler();
var client = new HttpClient(CreateHttpMessageHandler(httpclientHandler));
return new HttpClientWrapper(client);
}
private HttpMessageHandler CreateHttpMessageHandler(HttpMessageHandler httpMessageHandler)
{
_handlers
.OrderByDescending(handler => handler.Key)
.Select(handler => handler.Value)
.Reverse()
.ToList()
.ForEach(handler =>
{
var delegatingHandler = handler();
delegatingHandler.InnerHandler = httpMessageHandler;
httpMessageHandler = delegatingHandler;
});
return httpMessageHandler;
}
}
/// <summary>
/// This class was made to make unit testing easier when HttpClient is used.
/// </summary>
internal class HttpClientWrapper : IHttpClient
{
public HttpClient Client { get; }
public HttpClientWrapper(HttpClient client)
{
Client = client;
}
public Task<HttpResponseMessage> SendAsync(HttpRequestMessage request)
{
return Client.SendAsync(request);
}
}
}
using System;
using System.Collections.Concurrent;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester
{
public class HttpClientHttpRequester : IHttpRequester
{
private readonly IHttpClientCache _cacheHandlers;
private readonly ILogger _logger;
public HttpClientHttpRequester(ILoggerFactory loggerFactory, IHttpClientCache cacheHandlers)
{
_logger = loggerFactory.CreateLogger<HttpClientHttpRequester>();
_cacheHandlers = cacheHandlers;
}
public async Task<HttpResponseMessage> GetResponse(HttpRequestMessage request)
{
var builder = new HttpClientBuilder();
var cacheKey = GetCacheKey(request, builder);
var httpClient = GetHttpClient(cacheKey, builder);
try
{
return await httpClient.SendAsync(request);
}
catch (Exception exception)
{
_logger.LogError("Error making http request, exception:" + exception.Message);
throw exception;
}
finally
{
_cacheHandlers.Set(cacheKey, httpClient, TimeSpan.FromHours(24));
}
}
private IHttpClient GetHttpClient(string cacheKey, IHttpClientBuilder builder)
{
var httpClient = _cacheHandlers.Get(cacheKey);
if (httpClient == null)
{
httpClient = builder.Create();
}
return httpClient;
}
private string GetCacheKey(HttpRequestMessage request, IHttpClientBuilder builder)
{
string baseUrl = $"{request.RequestUri.Scheme}://{request.RequestUri.Authority}";
return baseUrl;
}
}
}
\ No newline at end of file
using System;
using System.Net.Http;
using System.Threading.Tasks;
namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester
{
public interface IHttpClient
{
HttpClient Client { get; }
Task<HttpResponseMessage> SendAsync(HttpRequestMessage request);
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
using System.Net;
namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester
{
public interface IHttpClientBuilder
{
/// <summary>
/// Creates the <see cref="HttpClient"/>
/// </summary>
IHttpClient Create();
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester
{
public interface IHttpClientCache
{
bool Exists(string id);
IHttpClient Get(string id);
void Remove(string id);
void Set(string id, IHttpClient handler, TimeSpan expirationTime);
}
}
using System.Net.Http;
using System.Threading.Tasks;
namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester
{
public interface IHttpRequester
{
Task<HttpResponseMessage> GetResponse(HttpRequestMessage request);
}
}
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester
{
public class MemoryHttpClientCache : IHttpClientCache
{
private readonly ConcurrentDictionary<string, ConcurrentQueue<IHttpClient>> _httpClientsCache = new ConcurrentDictionary<string, ConcurrentQueue<IHttpClient>>();
public void Set(string id, IHttpClient client, TimeSpan expirationTime)
{
ConcurrentQueue<IHttpClient> connectionQueue;
if (_httpClientsCache.TryGetValue(id, out connectionQueue))
{
connectionQueue.Enqueue(client);
}
else
{
connectionQueue = new ConcurrentQueue<IHttpClient>();
connectionQueue.Enqueue(client);
_httpClientsCache.TryAdd(id, connectionQueue);
}
}
public bool Exists(string id)
{
ConcurrentQueue<IHttpClient> connectionQueue;
return _httpClientsCache.TryGetValue(id, out connectionQueue);
}
public IHttpClient Get(string id)
{
IHttpClient client= null;
ConcurrentQueue<IHttpClient> connectionQueue;
if (_httpClientsCache.TryGetValue(id, out connectionQueue))
{
connectionQueue.TryDequeue(out client);
}
return client;
}
public void Remove(string id)
{
ConcurrentQueue<IHttpClient> connectionQueue;
_httpClientsCache.TryRemove(id, out connectionQueue);
}
}
}
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester.Middleware
{
public class HttpRequesterMiddleware : GatewayProxyMiddleware
{
private readonly RequestDelegate _next;
private readonly IHttpRequester _requester;
private readonly ILogger _logger;
public HttpRequesterMiddleware(RequestDelegate next,
ILoggerFactory loggerFactory,
IHttpRequester requester,
IRequestScopedDataRepository requestScopedDataRepository)
:base(requestScopedDataRepository)
{
_next = next;
_requester = requester;
_logger = loggerFactory.CreateLogger<HttpRequesterMiddleware>();
}
public async Task Invoke(HttpContext context)
{
_logger.LogDebug("started calling requester middleware");
var response = await _requester.GetResponse(Request);
_logger.LogDebug("setting http response message");
SetHttpResponseMessageThisRequest(response);
_logger.LogDebug("returning to calling middleware");
}
}
}
\ No newline at end of file
using Microsoft.AspNetCore.Builder;
namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester.Middleware
{
public static class HttpRequesterMiddlewareExtensions
{
public static IApplicationBuilder UseHttpRequesterMiddleware(this IApplicationBuilder builder)
{
return builder.UseMiddleware<HttpRequesterMiddleware>();
}
}
}
\ No newline at end of file
......@@ -143,7 +143,4 @@
<Generator>RazorGenerator</Generator>
</None>
</ItemGroup>
<ItemGroup>
<Folder Include="Dashboard\Consul\" />
</ItemGroup>
</Project>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment