Commit e7c9206d authored by yangxiaodong's avatar yangxiaodong

add producer proecess jobs.

parent 17b28e66
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Job;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Cap.Consistency.Kafka
{
public class KafkaJobProcessor : IJobProcessor
{
private readonly ConsistencyOptions _options;
private readonly CancellationTokenSource _cts;
private readonly IServiceProvider _provider;
private readonly ILogger _logger;
//internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);
private TimeSpan _pollingDelay;
public KafkaJobProcessor(
IOptions<ConsistencyOptions> options,
ILogger<KafkaJobProcessor> logger,
IServiceProvider provider) {
_logger = logger;
_options = options.Value;
_provider = provider;
_cts = new CancellationTokenSource();
_pollingDelay = TimeSpan.FromSeconds(_options.PollingDelay);
}
public bool Waiting { get; private set; }
public Task ProcessAsync(ProcessingContext context) {
if (context == null) throw new ArgumentNullException(nameof(context));
context.ThrowIfStopping();
return ProcessCoreAsync(context);
}
public async Task ProcessCoreAsync(ProcessingContext context) {
try {
var worked = await Step(context);
context.ThrowIfStopping();
Waiting = true;
if (!worked) {
var token = GetTokenToWaitOn(context);
}
await WaitHandleEx.WaitAnyAsync(WaitHandleEx.PulseEvent, context.CancellationToken.WaitHandle, _pollingDelay);
}
finally {
Waiting = false;
}
}
protected virtual CancellationToken GetTokenToWaitOn(ProcessingContext context) {
return context.CancellationToken;
}
private async Task<bool> Step(ProcessingContext context) {
using (var scopedContext = context.CreateScope()) {
var provider = scopedContext.Provider;
var messageStore = provider.GetRequiredService<IConsistencyMessageStore>();
try {
var message = await messageStore.GetFirstEnqueuedMessageAsync(_cts.Token);
if (message != null) {
var sp = Stopwatch.StartNew();
message.Status = MessageStatus.Processing;
await messageStore.UpdateAsync(message, _cts.Token);
var jobResult = ExecuteJob(message.Topic, message.Payload);
sp.Stop();
if (!jobResult) {
_logger.JobFailed(new Exception("topic send failed"));
}
else {
message.Status = MessageStatus.Successed;
await messageStore.UpdateAsync(message, _cts.Token);
//await messageStore.DeleteAsync(message, _cts.Token);
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
}
}
}
catch (Exception ex) {
return false;
}
}
return true;
}
private bool ExecuteJob(string topic, string content) {
try {
var config = new Dictionary<string, object> { { "bootstrap.servers", _options.BrokerUrlList } };
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8))) {
var message = producer.ProduceAsync(topic, null, content).Result;
if (message.Error.Code == ErrorCode.NoError) {
return true;
}
else {
return false;
}
}
}
catch (Exception ex) {
_logger.ExceptionOccuredWhileExecutingJob(topic, ex);
return false;
}
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.Extensions.Logging;
namespace Cap.Consistency.Kafka
{
internal static class LoggerExtensions
{
private static Action<ILogger, Exception> _collectingExpiredEntities;
private static Action<ILogger, Exception> _installing;
private static Action<ILogger, Exception> _installingError;
private static Action<ILogger, Exception> _installingSuccess;
private static Action<ILogger, Exception> _jobFailed;
private static Action<ILogger, Exception> _jobFailedWillRetry;
private static Action<ILogger, double, Exception> _jobExecuted;
private static Action<ILogger, int, Exception> _jobRetrying;
private static Action<ILogger, int, Exception> _jobCouldNotBeLoaded;
private static Action<ILogger, string, Exception> _exceptionOccuredWhileExecutingJob;
static LoggerExtensions() {
_collectingExpiredEntities = LoggerMessage.Define(
LogLevel.Debug,
1,
"Collecting expired entities.");
_installing = LoggerMessage.Define(
LogLevel.Debug,
1,
"Installing Jobs SQL objects...");
_installingError = LoggerMessage.Define(
LogLevel.Warning,
2,
"Exception occurred during automatic migration. Retrying...");
_installingSuccess = LoggerMessage.Define(
LogLevel.Debug,
3,
"Jobs SQL objects installed.");
_jobFailed = LoggerMessage.Define(
LogLevel.Warning,
1,
"Job failed to execute.");
_jobFailedWillRetry = LoggerMessage.Define(
LogLevel.Warning,
2,
"Job failed to execute. Will retry.");
_jobRetrying = LoggerMessage.Define<int>(
LogLevel.Debug,
3,
"Retrying a job: {Retries}...");
_jobExecuted = LoggerMessage.Define<double>(
LogLevel.Debug,
4,
"Job executed. Took: {Seconds} secs.");
_jobCouldNotBeLoaded = LoggerMessage.Define<int>(
LogLevel.Warning,
5,
"Could not load a job: '{JobId}'.");
_exceptionOccuredWhileExecutingJob = LoggerMessage.Define<string>(
LogLevel.Error,
6,
"An exception occured while trying to execute a job: '{JobId}'. " +
"Requeuing for another retry.");
}
public static void CollectingExpiredEntities(this ILogger logger) {
_collectingExpiredEntities(logger, null);
}
public static void Installing(this ILogger logger) {
_installing(logger, null);
}
public static void InstallingError(this ILogger logger, Exception ex) {
_installingError(logger, ex);
}
public static void InstallingSuccess(this ILogger logger) {
_installingSuccess(logger, null);
}
public static void JobFailed(this ILogger logger, Exception ex) {
_jobFailed(logger, ex);
}
public static void JobFailedWillRetry(this ILogger logger, Exception ex) {
_jobFailedWillRetry(logger, ex);
}
public static void JobRetrying(this ILogger logger, int retries) {
_jobRetrying(logger, retries, null);
}
public static void JobExecuted(this ILogger logger, double seconds) {
_jobExecuted(logger, seconds, null);
}
public static void JobCouldNotBeLoaded(this ILogger logger, int jobId, Exception ex) {
_jobCouldNotBeLoaded(logger, jobId, ex);
}
public static void ExceptionOccuredWhileExecutingJob(this ILogger logger, string jobId, Exception ex) {
_exceptionOccuredWhileExecutingJob(logger, jobId, ex);
}
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
using Microsoft.Extensions.Logging;
namespace Cap.Consistency
{
public class DefaultProducerClient : IProducerClient
{
private readonly IConsistencyMessageStore _store;
private readonly ILogger _logger;
private readonly CancellationTokenSource _cts;
public DefaultProducerClient(
IConsistencyMessageStore store,
ILogger<DefaultProducerClient> logger) {
_store = store;
_logger = logger;
_cts = new CancellationTokenSource();
}
public Task SendAsync(string topic, string content) {
if (topic == null) throw new ArgumentNullException(nameof(topic));
if (content == null) throw new ArgumentNullException(nameof(content));
return StoreMessage(topic, content);
}
public Task SendAsync<T>(string topic, T obj) {
if (topic == null) throw new ArgumentNullException(nameof(topic));
var content = Helper.ToJson(obj);
if (content == null)
throw new InvalidCastException(nameof(obj));
return StoreMessage(topic, content);
}
private async Task StoreMessage(string topic, string content) {
var message = new ConsistencyMessage {
Topic = topic,
Payload = content
};
await _store.CreateAsync(message, _cts.Token);
WaitHandleEx.PulseEvent.Set();
if (_logger.IsEnabled(LogLevel.Debug)) {
_logger.LogDebug("Enqueuing a topic to be store. topic:{topic}, content:{content}", topic, content);
}
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
namespace Cap.Consistency
{
public interface IProducerClient
{
Task SendAsync(string topic, string content);
}
}
using System;
using System.Collections.Generic;
using System.Text;
using Newtonsoft.Json;
namespace Cap.Consistency.Infrastructure
{
internal static class Helper
{
private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
private static JsonSerializerSettings SerializerSettings;
public static void SetSerializerSettings(JsonSerializerSettings setting) {
SerializerSettings = setting;
}
public static string ToJson(object value) {
return value != null
? JsonConvert.SerializeObject(value, SerializerSettings)
: null;
}
public static T FromJson<T>(string value) {
return value != null
? JsonConvert.DeserializeObject<T>(value, SerializerSettings)
: default(T);
}
public static object FromJson(string value, Type type) {
if (type == null) throw new ArgumentNullException(nameof(type));
return value != null
? JsonConvert.DeserializeObject(value, type, SerializerSettings)
: null;
}
public static long ToTimestamp(DateTime value) {
var elapsedTime = value - Epoch;
return (long)elapsedTime.TotalSeconds;
}
public static DateTime FromTimestamp(long value) {
return Epoch.AddSeconds(value);
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Cap.Consistency.Infrastructure
{
public static class WaitHandleEx
{
public static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);
public static Task WaitAnyAsync(WaitHandle handle1, WaitHandle handle2, TimeSpan timeout) {
var t1 = handle1.WaitOneAsync(timeout);
var t2 = handle2.WaitOneAsync(timeout);
return Task.WhenAny(t1, t2);
}
public static async Task<bool> WaitOneAsync(this WaitHandle handle, TimeSpan timeout) {
RegisteredWaitHandle registeredHandle = null;
try {
var tcs = new TaskCompletionSource<bool>();
registeredHandle = ThreadPool.RegisterWaitForSingleObject(
handle,
(state, timedOut) => ((TaskCompletionSource<bool>)state).TrySetResult(!timedOut),
tcs,
timeout,
true);
return await tcs.Task;
}
finally {
if (registeredHandle != null) {
registeredHandle.Unregister(null);
}
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment