Commit 391adbf1 authored by yangxiaodong's avatar yangxiaodong

server jobs fro process startup

parent 8c69e4d2
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Job; using Cap.Consistency.Job;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Cap.Consistency.Infrastructure; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
namespace Cap.Consistency namespace Cap.Consistency
...@@ -16,8 +16,8 @@ namespace Cap.Consistency ...@@ -16,8 +16,8 @@ namespace Cap.Consistency
private ILogger _logger; private ILogger _logger;
private ILoggerFactory _loggerFactory; private ILoggerFactory _loggerFactory;
private IServiceProvider _provider; private IServiceProvider _provider;
private IJobProcessor[] _processors;
private CancellationTokenSource _cts; private CancellationTokenSource _cts;
private IJobProcessor _processor;
private ConsistencyOptions _options; private ConsistencyOptions _options;
private ProcessingContext _context; private ProcessingContext _context;
private DefaultCronJobRegistry _defaultJobRegistry; private DefaultCronJobRegistry _defaultJobRegistry;
...@@ -42,7 +42,8 @@ namespace Cap.Consistency ...@@ -42,7 +42,8 @@ namespace Cap.Consistency
public void Start() { public void Start() {
var processorCount = Environment.ProcessorCount; var processorCount = Environment.ProcessorCount;
_processor = _provider.GetService<IJobProcessor>(); processorCount = 1;
_processors = GetProcessors(processorCount);
_logger.ServerStarting(processorCount, 1); _logger.ServerStarting(processorCount, 1);
_context = new ProcessingContext( _context = new ProcessingContext(
...@@ -50,9 +51,10 @@ namespace Cap.Consistency ...@@ -50,9 +51,10 @@ namespace Cap.Consistency
_defaultJobRegistry, _defaultJobRegistry,
_cts.Token); _cts.Token);
_compositeTask = Task.Run(() => { var processorTasks = _processors
InfiniteRetry(_processor).ProcessAsync(_context); .Select(p => InfiniteRetry(p))
}); .Select(p => p.ProcessAsync(_context));
_compositeTask = Task.WhenAll(processorTasks);
} }
public void Dispose() { public void Dispose() {
...@@ -76,7 +78,25 @@ namespace Cap.Consistency ...@@ -76,7 +78,25 @@ namespace Cap.Consistency
private IJobProcessor InfiniteRetry(IJobProcessor inner) { private IJobProcessor InfiniteRetry(IJobProcessor inner) {
return new InfiniteRetryProcessor(inner, _loggerFactory); return new InfiniteRetryProcessor(inner, _loggerFactory);
} }
private IJobProcessor[] GetProcessors(int processorCount) {
var returnedProcessors = new List<IJobProcessor>();
for (int i = 0; i < processorCount; i++) {
var processors = _provider.GetServices<IJobProcessor>();
foreach (var processor in processors) {
if (processor is CronJobProcessor) {
if (i == 0) // only add first cronJob
returnedProcessors.Add(processor);
}
else {
returnedProcessors.Add(processor);
}
}
}
return returnedProcessors.ToArray();
}
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment