Commit cedd9d9f authored by yangxiaodong's avatar yangxiaodong

Add RabbitMQOptions and remove old options in CAP project.

parent f2cfaf16
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Job;
using DotNetCore.CAP.RabbitMQ;
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapBuilderExtensions
{
public static CapBuilder AddRabbitMQ(this CapBuilder builder, Action<RabbitMQOptions> setupOptions)
{
if (setupOptions == null) throw new ArgumentNullException(nameof(setupOptions));
builder.Services.Configure(setupOptions);
builder.Services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>();
builder.Services.AddTransient<IJobProcessor, RabbitJobProcessor>();
return builder;
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Text;
namespace DotNetCore.CAP.RabbitMQ
{
public class RabbitMQOptions
{
/// <summary>
/// Default value for connection attempt timeout, in milliseconds.
/// </summary>
public const int DefaultConnectionTimeout = 30 * 1000;
/// <summary>
/// Default password (value: "guest").
/// </summary>
/// <remarks>PLEASE KEEP THIS MATCHING THE DOC ABOVE.</remarks>
public const string DefaultPass = "guest";
/// <summary>
/// Default user name (value: "guest").
/// </summary>
/// <remarks>PLEASE KEEP THIS MATCHING THE DOC ABOVE.</remarks>
public const string DefaultUser = "guest";
/// <summary>
/// Default virtual host (value: "/").
/// </summary>
/// <remarks> PLEASE KEEP THIS MATCHING THE DOC ABOVE.</remarks>
public const string DefaultVHost = "/";
/// <summary>The host to connect to.</summary>
public string HostName { get; set; } = "localhost";
/// <summary>
/// Password to use when authenticating to the server.
/// </summary>
public string Password { get; set; } = DefaultPass;
/// <summary>
/// Username to use when authenticating to the server.
/// </summary>
public string UserName { get; set; } = DefaultUser;
/// <summary>
/// Virtual host to access during this connection.
/// </summary>
public string VirtualHost { get; set; } = DefaultVHost;
/// <summary>
/// Timeout setting for connection attempts (in milliseconds).
/// </summary>
public int RequestedConnectionTimeout { get; set; } = DefaultConnectionTimeout;
/// <summary>
/// Timeout setting for socket read operations (in milliseconds).
/// </summary>
public int SocketReadTimeout { get; set; } = DefaultConnectionTimeout;
/// <summary>
/// Timeout setting for socket write operations (in milliseconds).
/// </summary>
public int SocketWriteTimeout { get; set; } = DefaultConnectionTimeout;
/// <summary>
/// The port to connect on. <see cref="AmqpTcpEndpoint.UseDefaultPort"/>
/// indicates the default for the protocol should be used.
/// </summary>
public int Port { get; set; } = -1;
}
}
...@@ -6,15 +6,11 @@ ...@@ -6,15 +6,11 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="5.0.1-rc1" /> <PackageReference Include="RabbitMQ.Client" Version="4.1.3" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" /> <ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<Folder Include="Microsoft.Extensions.DependencyInjection\" />
</ItemGroup>
</Project> </Project>
\ No newline at end of file
using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Job;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
{
public class RabbitJobProcessor : IJobProcessor
{
private readonly CapOptions _capOptions;
private readonly RabbitMQOptions _rabbitMQOptions;
private readonly CancellationTokenSource _cts;
private readonly IServiceProvider _provider;
private readonly ILogger _logger;
private TimeSpan _pollingDelay;
public RabbitJobProcessor(
IOptions<CapOptions> capOptions,
IOptions<RabbitMQOptions> rabbitMQOptions,
IOptions<RabbitMQOptions> options,
ILogger<RabbitJobProcessor> logger,
IServiceProvider provider)
{
_logger = logger;
_capOptions = capOptions.Value;
_rabbitMQOptions = rabbitMQOptions.Value;
_provider = provider;
_cts = new CancellationTokenSource();
_pollingDelay = TimeSpan.FromSeconds(_capOptions.PollingDelay);
}
public bool Waiting { get; private set; }
public Task ProcessAsync(ProcessingContext context)
{
if (context == null) throw new ArgumentNullException(nameof(context));
context.ThrowIfStopping();
return ProcessCoreAsync(context);
}
public async Task ProcessCoreAsync(ProcessingContext context)
{
try
{
var worked = await Step(context);
context.ThrowIfStopping();
Waiting = true;
if (!worked)
{
var token = GetTokenToWaitOn(context);
}
await WaitHandleEx.WaitAnyAsync(WaitHandleEx.PulseEvent, context.CancellationToken.WaitHandle, _pollingDelay);
}
finally
{
Waiting = false;
}
}
protected virtual CancellationToken GetTokenToWaitOn(ProcessingContext context)
{
return context.CancellationToken;
}
private async Task<bool> Step(ProcessingContext context)
{
using (var scopedContext = context.CreateScope())
{
var provider = scopedContext.Provider;
var messageStore = provider.GetRequiredService<ICapMessageStore>();
try
{
var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync();
if (message != null)
{
var sp = Stopwatch.StartNew();
message.StateName = StateName.Processing;
await messageStore.UpdateSentMessageAsync(message);
var jobResult = ExecuteJob(message.KeyName, message.Content);
sp.Stop();
if (!jobResult)
{
_logger.JobFailed(new Exception("topic send failed"));
}
else
{
//TODO : the state will be deleted when release.
message.StateName = StateName.Succeeded;
await messageStore.UpdateSentMessageAsync(message);
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
}
}
}
catch (Exception)
{
return false;
}
}
return true;
}
private bool ExecuteJob(string topic, string content)
{
try
{
var factory = new ConnectionFactory()
{
HostName = _rabbitMQOptions.HostName,
UserName = _rabbitMQOptions.UserName,
Port = _rabbitMQOptions.Port,
Password = _rabbitMQOptions.Password,
VirtualHost = _rabbitMQOptions.VirtualHost,
RequestedConnectionTimeout = _rabbitMQOptions.RequestedConnectionTimeout,
SocketReadTimeout = _rabbitMQOptions.SocketReadTimeout,
SocketWriteTimeout = _rabbitMQOptions.SocketWriteTimeout
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "topic_logs",
type: "topic");
var body = Encoding.UTF8.GetBytes(content);
channel.BasicPublish(exchange: "topic_logs",
routingKey: topic,
basicProperties: null,
body: body);
return true;
}
}
catch (Exception ex)
{
_logger.ExceptionOccuredWhileExecutingJob(topic, ex);
return false;
}
}
}
}
\ No newline at end of file
using System;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.RabbitMQ
{
internal static class LoggerExtensions
{
private static Action<ILogger, Exception> _collectingExpiredEntities;
private static Action<ILogger, Exception> _installing;
private static Action<ILogger, Exception> _installingError;
private static Action<ILogger, Exception> _installingSuccess;
private static Action<ILogger, Exception> _jobFailed;
private static Action<ILogger, Exception> _jobFailedWillRetry;
private static Action<ILogger, double, Exception> _jobExecuted;
private static Action<ILogger, int, Exception> _jobRetrying;
private static Action<ILogger, int, Exception> _jobCouldNotBeLoaded;
private static Action<ILogger, string, Exception> _exceptionOccuredWhileExecutingJob;
static LoggerExtensions()
{
_collectingExpiredEntities = LoggerMessage.Define(
LogLevel.Debug,
1,
"Collecting expired entities.");
_installing = LoggerMessage.Define(
LogLevel.Debug,
1,
"Installing Jobs SQL objects...");
_installingError = LoggerMessage.Define(
LogLevel.Warning,
2,
"Exception occurred during automatic migration. Retrying...");
_installingSuccess = LoggerMessage.Define(
LogLevel.Debug,
3,
"Jobs SQL objects installed.");
_jobFailed = LoggerMessage.Define(
LogLevel.Warning,
1,
"Job failed to execute.");
_jobFailedWillRetry = LoggerMessage.Define(
LogLevel.Warning,
2,
"Job failed to execute. Will retry.");
_jobRetrying = LoggerMessage.Define<int>(
LogLevel.Debug,
3,
"Retrying a job: {Retries}...");
_jobExecuted = LoggerMessage.Define<double>(
LogLevel.Debug,
4,
"Job executed. Took: {Seconds} secs.");
_jobCouldNotBeLoaded = LoggerMessage.Define<int>(
LogLevel.Warning,
5,
"Could not load a job: '{JobId}'.");
_exceptionOccuredWhileExecutingJob = LoggerMessage.Define<string>(
LogLevel.Error,
6,
"An exception occured while trying to execute a job: '{JobId}'. " +
"Requeuing for another retry.");
}
public static void CollectingExpiredEntities(this ILogger logger)
{
_collectingExpiredEntities(logger, null);
}
public static void Installing(this ILogger logger)
{
_installing(logger, null);
}
public static void InstallingError(this ILogger logger, Exception ex)
{
_installingError(logger, ex);
}
public static void InstallingSuccess(this ILogger logger)
{
_installingSuccess(logger, null);
}
public static void JobFailed(this ILogger logger, Exception ex)
{
_jobFailed(logger, ex);
}
public static void JobFailedWillRetry(this ILogger logger, Exception ex)
{
_jobFailedWillRetry(logger, ex);
}
public static void JobRetrying(this ILogger logger, int retries)
{
_jobRetrying(logger, retries, null);
}
public static void JobExecuted(this ILogger logger, double seconds)
{
_jobExecuted(logger, seconds, null);
}
public static void JobCouldNotBeLoaded(this ILogger logger, int jobId, Exception ex)
{
_jobCouldNotBeLoaded(logger, jobId, ex);
}
public static void ExceptionOccuredWhileExecutingJob(this ILogger logger, string jobId, Exception ex)
{
_exceptionOccuredWhileExecutingJob(logger, jobId, ex);
}
}
}
\ No newline at end of file
...@@ -10,28 +10,38 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -10,28 +10,38 @@ namespace DotNetCore.CAP.RabbitMQ
{ {
public const string TYPE = "topic"; public const string TYPE = "topic";
private string _queueName;
private readonly string _exchange; private readonly string _exchange;
private readonly string _hostName; private readonly RabbitMQOptions _rabbitMQOptions;
private IConnectionFactory _connectionFactory; private IConnectionFactory _connectionFactory;
private IConnection _connection; private IConnection _connection;
private IModel _channel; private IModel _channel;
private string _queueName;
public event EventHandler<MessageBase> MessageReceieved; public event EventHandler<MessageBase> MessageReceieved;
public RabbitMQConsumerClient(string exchange, string hostName) public RabbitMQConsumerClient(string exchange, RabbitMQOptions options)
{ {
_exchange = exchange; _exchange = exchange;
_hostName = hostName; _rabbitMQOptions = options;
InitClient(); InitClient();
} }
private void InitClient() private void InitClient()
{ {
_connectionFactory = new ConnectionFactory { HostName = _hostName }; _connectionFactory = new ConnectionFactory()
{
HostName = _rabbitMQOptions.HostName,
UserName = _rabbitMQOptions.UserName,
Port = _rabbitMQOptions.Port,
Password = _rabbitMQOptions.Password,
VirtualHost = _rabbitMQOptions.VirtualHost,
RequestedConnectionTimeout = _rabbitMQOptions.RequestedConnectionTimeout,
SocketReadTimeout = _rabbitMQOptions.SocketReadTimeout,
SocketWriteTimeout = _rabbitMQOptions.SocketWriteTimeout
};
_connection = _connectionFactory.CreateConnection(); _connection = _connectionFactory.CreateConnection();
_channel = _connection.CreateModel(); _channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: _exchange, type: TYPE); _channel.ExchangeDeclare(exchange: _exchange, type: TYPE);
...@@ -40,8 +50,6 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -40,8 +50,6 @@ namespace DotNetCore.CAP.RabbitMQ
public void Listening(TimeSpan timeout) public void Listening(TimeSpan timeout)
{ {
// Task.Delay(timeout).Wait();
var consumer = new EventingBasicConsumer(_channel); var consumer = new EventingBasicConsumer(_channel);
consumer.Received += OnConsumerReceived; consumer.Received += OnConsumerReceived;
_channel.BasicConsume(_queueName, true, consumer); _channel.BasicConsume(_queueName, true, consumer);
......
namespace DotNetCore.CAP.RabbitMQ using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.RabbitMQ
{ {
public class RabbitMQConsumerClientFactory : IConsumerClientFactory public class RabbitMQConsumerClientFactory : IConsumerClientFactory
{ {
public IConsumerClient Create(string groupId, string clientHostAddress) private readonly RabbitMQOptions _rabbitMQOptions;
public RabbitMQConsumerClientFactory(IOptions<RabbitMQOptions> rabbitMQOptions)
{ {
return new RabbitMQConsumerClient(groupId, clientHostAddress); _rabbitMQOptions = rabbitMQOptions.Value;
}
public IConsumerClient Create(string groupId)
{
return new RabbitMQConsumerClient(groupId, _rabbitMQOptions);
} }
} }
} }
\ No newline at end of file
using System;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
{
public class RabbitMQProducerClient : ICapProducerService
{
private readonly CapOptions _options;
private readonly ILogger _logger;
public RabbitMQProducerClient(IOptions<CapOptions> options, ILoggerFactory loggerFactory)
{
_options = options.Value;
_logger = loggerFactory.CreateLogger(nameof(RabbitMQProducerClient));
}
public Task SendAsync(string topic, string content)
{
var factory = new ConnectionFactory() { HostName = _options.BrokerUrlList };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "topic_logs",
type: "topic");
var body = Encoding.UTF8.GetBytes(content);
channel.BasicPublish(exchange: "topic_logs",
routingKey: topic,
basicProperties: null,
body: body);
return Task.CompletedTask;
}
}
public Task SendAsync<T>(string topic, T contentObj)
{
throw new NotImplementedException();
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment