Commit 64dcc7fc authored by Savorboard's avatar Savorboard

rename

parent 2825ccea
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Job;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Kafka
{
public class KafkaJobProcessor : IJobProcessor
{
private readonly KafkaOptions _kafkaOptions;
private readonly CancellationTokenSource _cts;
private readonly IServiceProvider _provider;
private readonly ILogger _logger;
private readonly TimeSpan _pollingDelay;
public KafkaJobProcessor(
IOptions<CapOptions> capOptions,
IOptions<KafkaOptions> kafkaOptions,
ILogger<KafkaJobProcessor> logger,
IServiceProvider provider)
{
_logger = logger;
_kafkaOptions = kafkaOptions.Value;
_provider = provider;
_cts = new CancellationTokenSource();
_pollingDelay = TimeSpan.FromSeconds(capOptions.Value.PollingDelay);
}
public bool Waiting { get; private set; }
public Task ProcessAsync(ProcessingContext context)
{
if (context == null) throw new ArgumentNullException(nameof(context));
context.ThrowIfStopping();
return ProcessCoreAsync(context);
}
public async Task ProcessCoreAsync(ProcessingContext context)
{
try
{
var worked = await Step(context);
context.ThrowIfStopping();
Waiting = true;
if (!worked)
{
var token = GetTokenToWaitOn(context);
}
await WaitHandleEx.WaitAnyAsync(WaitHandleEx.PulseEvent,
context.CancellationToken.WaitHandle, _pollingDelay);
}
finally
{
Waiting = false;
}
}
protected virtual CancellationToken GetTokenToWaitOn(ProcessingContext context)
{
return context.CancellationToken;
}
private async Task<bool> Step(ProcessingContext context)
{
throw new NotImplementedException();
// using (var scopedContext = context.CreateScope())
// {
// var provider = scopedContext.Provider;
// var messageStore = provider.GetRequiredService<ICapMessageStore>();
// var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync();
// if (message == null) return true;
// try
// {
// var sp = Stopwatch.StartNew();
// message.StatusName = StatusName.Processing;
// await messageStore.UpdateSentMessageAsync(message);
// await ExecuteJobAsync(message.KeyName, message.Content);
// sp.Stop();
// message.StatusName = StatusName.Succeeded;
// await messageStore.UpdateSentMessageAsync(message);
// _logger.JobExecuted(sp.Elapsed.TotalSeconds);
// }
// catch (Exception ex)
// {
// _logger.ExceptionOccuredWhileExecutingJob(message.KeyName, ex);
// return false;
// }
// }
// return true;
}
private Task ExecuteJobAsync(string topic, string content)
{
var config = _kafkaOptions.AsRdkafkaConfig();
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{
producer.ProduceAsync(topic, null, content);
producer.Flush();
}
return Task.CompletedTask;
}
}
}
\ No newline at end of file
using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Job;
using DotNetCore.CAP.Job.States;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
{
public class RabbitJobProcessor : IMessageJobProcessor
{
private readonly RabbitMQOptions _rabbitMqOptions;
private readonly CancellationTokenSource _cts;
private readonly IStateChanger _stateChanger;
private readonly IServiceProvider _provider;
private readonly ILogger _logger;
private readonly TimeSpan _pollingDelay;
public RabbitJobProcessor(
IOptions<CapOptions> capOptions,
IOptions<RabbitMQOptions> rabbitMQOptions,
ILogger<RabbitJobProcessor> logger,
IStateChanger stateChanger,
IServiceProvider provider)
{
_logger = logger;
_rabbitMqOptions = rabbitMQOptions.Value;
_provider = provider;
_stateChanger = stateChanger;
_cts = new CancellationTokenSource();
var capOptions1 = capOptions.Value;
_pollingDelay = TimeSpan.FromSeconds(capOptions1.PollingDelay);
}
public bool Waiting { get; private set; }
public Task ProcessAsync(ProcessingContext context)
{
if (context == null) throw new ArgumentNullException(nameof(context));
System.Diagnostics.Debug.WriteLine("RabbitMQ Processor 执行: " + DateTime.Now);
context.ThrowIfStopping();
return ProcessCoreAsync(context);
}
public async Task ProcessCoreAsync(ProcessingContext context)
{
try
{
var worked = await Step(context);
context.ThrowIfStopping();
Waiting = true;
if (!worked)
{
var token = GetTokenToWaitOn(context);
await WaitHandleEx.WaitAnyAsync(WaitHandleEx.SentPulseEvent, token.WaitHandle, _pollingDelay);
}
}
finally
{
Waiting = false;
}
}
protected virtual CancellationToken GetTokenToWaitOn(ProcessingContext context)
{
return context.CancellationToken;
}
private async Task<bool> Step(ProcessingContext context)
{
var fetched = default(IFetchedMessage);
using (var scopedContext = context.CreateScope())
{
var provider = scopedContext.Provider;
var connection = provider.GetRequiredService<IStorageConnection>();
if ((fetched = await connection.FetchNextMessageAsync()) != null)
{
using (fetched)
{
var message = await connection.GetSentMessageAsync(fetched.MessageId);
try
{
var sp = Stopwatch.StartNew();
await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection);
if (message.Retries > 0)
{
_logger.JobRetrying(message.Retries);
}
var result = ExecuteJob(message.KeyName, message.Content);
sp.Stop();
var newState = default(IState);
if (!result.Succeeded)
{
var shouldRetry = await UpdateJobForRetryAsync(message, connection);
if (shouldRetry)
{
newState = new ScheduledState();
_logger.JobFailedWillRetry(result.Exception);
}
else
{
newState = new FailedState();
_logger.JobFailed(result.Exception);
}
}
else
{
newState = new SucceededState();
}
await _stateChanger.ChangeStateAsync(message, newState, connection);
fetched.RemoveFromQueue();
if (result.Succeeded)
{
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
}
}
catch (Exception ex)
{
_logger.ExceptionOccuredWhileExecutingJob(message?.KeyName, ex);
return false;
}
}
}
}
return fetched != null;
}
private async Task<bool> UpdateJobForRetryAsync(CapSentMessage message, IStorageConnection connection)
{
var retryBehavior = RetryBehavior.DefaultRetry;
var now = DateTime.UtcNow;
var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
{
return false;
}
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.LastRun = due;
using (var transaction = connection.CreateTransaction())
{
transaction.UpdateMessage(message);
await transaction.CommitAsync();
}
return true;
}
private OperateResult ExecuteJob(string topic, string content)
{
var factory = new ConnectionFactory()
{
HostName = _rabbitMqOptions.HostName,
UserName = _rabbitMqOptions.UserName,
Port = _rabbitMqOptions.Port,
Password = _rabbitMqOptions.Password,
VirtualHost = _rabbitMqOptions.VirtualHost,
RequestedConnectionTimeout = _rabbitMqOptions.RequestedConnectionTimeout,
SocketReadTimeout = _rabbitMqOptions.SocketReadTimeout,
SocketWriteTimeout = _rabbitMqOptions.SocketWriteTimeout
};
try
{
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
var body = Encoding.UTF8.GetBytes(content);
channel.ExchangeDeclare(_rabbitMqOptions.TopicExchangeName, _rabbitMqOptions.EXCHANGE_TYPE);
channel.BasicPublish(exchange: _rabbitMqOptions.TopicExchangeName,
routingKey: topic,
basicProperties: null,
body: body);
}
return OperateResult.Success;
}
catch (Exception ex)
{
return OperateResult.Failed(ex, new OperateError() { Code = ex.HResult.ToString(), Description = ex.Message });
}
}
}
}
\ No newline at end of file
...@@ -4,7 +4,7 @@ using System.Text; ...@@ -4,7 +4,7 @@ using System.Text;
namespace DotNetCore.CAP.Processor namespace DotNetCore.CAP.Processor
{ {
public interface IAdditionalProcessor : IJobProcessor public interface IAdditionalProcessor : IProcessor
{ {
} }
......
...@@ -4,7 +4,7 @@ using System.Text; ...@@ -4,7 +4,7 @@ using System.Text;
namespace DotNetCore.CAP.Processor namespace DotNetCore.CAP.Processor
{ {
public interface IMessageJobProcessor : IJobProcessor public interface IMessageProcessor : IProcessor
{ {
bool Waiting { get; } bool Waiting { get; }
} }
......
...@@ -17,8 +17,8 @@ namespace DotNetCore.CAP.Processor ...@@ -17,8 +17,8 @@ namespace DotNetCore.CAP.Processor
private readonly CancellationTokenSource _cts; private readonly CancellationTokenSource _cts;
private readonly CapOptions _options; private readonly CapOptions _options;
private IJobProcessor[] _processors; private IProcessor[] _processors;
private IList<IMessageJobProcessor> _messageProcessors; private IList<IMessageProcessor> _messageProcessors;
private ProcessingContext _context; private ProcessingContext _context;
private Task _compositeTask; private Task _compositeTask;
private bool _disposed; private bool _disposed;
...@@ -100,17 +100,17 @@ namespace DotNetCore.CAP.Processor ...@@ -100,17 +100,17 @@ namespace DotNetCore.CAP.Processor
return true; return true;
} }
private IJobProcessor InfiniteRetry(IJobProcessor inner) private IProcessor InfiniteRetry(IProcessor inner)
{ {
return new InfiniteRetryProcessor(inner, _loggerFactory); return new InfiniteRetryProcessor(inner, _loggerFactory);
} }
private IJobProcessor[] GetProcessors(int processorCount) private IProcessor[] GetProcessors(int processorCount)
{ {
var returnedProcessors = new List<IJobProcessor>(); var returnedProcessors = new List<IProcessor>();
for (int i = 0; i < processorCount; i++) for (int i = 0; i < processorCount; i++)
{ {
var messageProcessors = _provider.GetService<IMessageJobProcessor>(); var messageProcessors = _provider.GetService<IMessageProcessor>();
_messageProcessors.Add(messageProcessors); _messageProcessors.Add(messageProcessors);
} }
returnedProcessors.AddRange(_messageProcessors); returnedProcessors.AddRange(_messageProcessors);
......
...@@ -4,13 +4,13 @@ using Microsoft.Extensions.Logging; ...@@ -4,13 +4,13 @@ using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Processor namespace DotNetCore.CAP.Processor
{ {
public class InfiniteRetryProcessor : IJobProcessor public class InfiniteRetryProcessor : IProcessor
{ {
private readonly IJobProcessor _inner; private readonly IProcessor _inner;
private readonly ILogger _logger; private readonly ILogger _logger;
public InfiniteRetryProcessor( public InfiniteRetryProcessor(
IJobProcessor inner, IProcessor inner,
ILoggerFactory loggerFactory) ILoggerFactory loggerFactory)
{ {
_inner = inner; _inner = inner;
......
...@@ -8,7 +8,7 @@ using Microsoft.Extensions.Options; ...@@ -8,7 +8,7 @@ using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Processor namespace DotNetCore.CAP.Processor
{ {
public class DefaultMessageJobProcessor : IMessageJobProcessor public class DefaultMessageProcessor : IMessageProcessor
{ {
private readonly IQueueExecutorFactory _queueExecutorFactory; private readonly IQueueExecutorFactory _queueExecutorFactory;
private readonly IServiceProvider _provider; private readonly IServiceProvider _provider;
...@@ -19,11 +19,11 @@ namespace DotNetCore.CAP.Processor ...@@ -19,11 +19,11 @@ namespace DotNetCore.CAP.Processor
internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);
public DefaultMessageJobProcessor( public DefaultMessageProcessor(
IServiceProvider provider, IServiceProvider provider,
IQueueExecutorFactory queueExecutorFactory, IQueueExecutorFactory queueExecutorFactory,
IOptions<CapOptions> capOptions, IOptions<CapOptions> capOptions,
ILogger<DefaultMessageJobProcessor> logger) ILogger<DefaultMessageProcessor> logger)
{ {
_logger = logger; _logger = logger;
_queueExecutorFactory = queueExecutorFactory; _queueExecutorFactory = queueExecutorFactory;
......
...@@ -10,7 +10,7 @@ using Microsoft.Extensions.Options; ...@@ -10,7 +10,7 @@ using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Processor namespace DotNetCore.CAP.Processor
{ {
public class PublishQueuer : IJobProcessor public class PublishQueuer : IProcessor
{ {
private ILogger _logger; private ILogger _logger;
private CapOptions _options; private CapOptions _options;
...@@ -18,7 +18,7 @@ namespace DotNetCore.CAP.Processor ...@@ -18,7 +18,7 @@ namespace DotNetCore.CAP.Processor
private IServiceProvider _provider; private IServiceProvider _provider;
private TimeSpan _pollingDelay; private TimeSpan _pollingDelay;
internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); public static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);
public PublishQueuer( public PublishQueuer(
ILogger<PublishQueuer> logger, ILogger<PublishQueuer> logger,
...@@ -59,7 +59,7 @@ namespace DotNetCore.CAP.Processor ...@@ -59,7 +59,7 @@ namespace DotNetCore.CAP.Processor
context.ThrowIfStopping(); context.ThrowIfStopping();
DefaultMessageJobProcessor.PulseEvent.Set(); DefaultMessageProcessor.PulseEvent.Set();
await WaitHandleEx.WaitAnyAsync(PulseEvent, await WaitHandleEx.WaitAnyAsync(PulseEvent,
context.CancellationToken.WaitHandle, _pollingDelay); context.CancellationToken.WaitHandle, _pollingDelay);
......
...@@ -10,7 +10,7 @@ using Microsoft.Extensions.Options; ...@@ -10,7 +10,7 @@ using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Processor namespace DotNetCore.CAP.Processor
{ {
public class SubscribeQueuer : IJobProcessor public class SubscribeQueuer : IProcessor
{ {
private ILogger _logger; private ILogger _logger;
private CapOptions _options; private CapOptions _options;
...@@ -59,7 +59,7 @@ namespace DotNetCore.CAP.Processor ...@@ -59,7 +59,7 @@ namespace DotNetCore.CAP.Processor
context.ThrowIfStopping(); context.ThrowIfStopping();
DefaultMessageJobProcessor.PulseEvent.Set(); DefaultMessageProcessor.PulseEvent.Set();
await WaitHandleEx.WaitAnyAsync(PulseEvent, await WaitHandleEx.WaitAnyAsync(PulseEvent,
context.CancellationToken.WaitHandle, _pollingDelay); context.CancellationToken.WaitHandle, _pollingDelay);
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
namespace DotNetCore.CAP.Processor namespace DotNetCore.CAP.Processor
{ {
public interface IJobProcessor public interface IProcessor
{ {
Task ProcessAsync(ProcessingContext context); Task ProcessAsync(ProcessingContext context);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment