Commit 9765e35c authored by yangxiaodong's avatar yangxiaodong

Rename class.

parent 30e52966
namespace Cap.Consistency.Consumer namespace Cap.Consistency.Consumer
{ {
public interface IConsumerHandler : ITopicServer public interface IConsumerHandler : IProcessingServer
{ {
} }
} }
\ No newline at end of file
using System; using System;
using System.Linq;
using System.Collections.Generic;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Cap.Consistency.Infrastructure; using Cap.Consistency.Infrastructure;
using Cap.Consistency.Store; using Cap.Consistency.Store;
using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
namespace Cap.Consistency namespace Cap.Consistency
{ {
public abstract class BootstrapperBase : IBootstrapper public class DefaultBootstrapper : IBootstrapper
{ {
private IApplicationLifetime _appLifetime; private IApplicationLifetime _appLifetime;
private CancellationTokenSource _cts; private CancellationTokenSource _cts;
private CancellationTokenRegistration _ctsRegistration; private CancellationTokenRegistration _ctsRegistration;
private Task _bootstrappingTask; private Task _bootstrappingTask;
public BootstrapperBase( public DefaultBootstrapper(
ConsistencyOptions options, IOptions<ConsistencyOptions> options,
ConsistencyMessageManager storage, ConsistencyMessageManager storage,
ITopicServer server,
IApplicationLifetime appLifetime, IApplicationLifetime appLifetime,
IServiceProvider provider) { IServiceProvider provider) {
Options = options;
Options = options.Value;
Storage = storage; Storage = storage;
Server = server;
_appLifetime = appLifetime; _appLifetime = appLifetime;
Provider = provider; Provider = provider;
Servers = Provider.GetServices<IProcessingServer>();
_cts = new CancellationTokenSource(); _cts = new CancellationTokenSource();
_ctsRegistration = appLifetime.ApplicationStopping.Register(() => { _ctsRegistration = appLifetime.ApplicationStopping.Register(() => {
_cts.Cancel(); _cts.Cancel();
...@@ -41,7 +44,7 @@ namespace Cap.Consistency ...@@ -41,7 +44,7 @@ namespace Cap.Consistency
protected ConsistencyMessageManager Storage { get; } protected ConsistencyMessageManager Storage { get; }
protected ITopicServer Server { get; } protected IEnumerable<IProcessingServer> Servers { get; }
public IServiceProvider Provider { get; private set; } public IServiceProvider Provider { get; private set; }
...@@ -57,14 +60,24 @@ namespace Cap.Consistency ...@@ -57,14 +60,24 @@ namespace Cap.Consistency
await BootstrapCoreAsync(); await BootstrapCoreAsync();
if (_cts.IsCancellationRequested) return; if (_cts.IsCancellationRequested) return;
Server.Start(); foreach (var item in Servers) {
try {
item.Start();
}
catch (Exception) {
}
}
_ctsRegistration.Dispose(); _ctsRegistration.Dispose();
_cts.Dispose(); _cts.Dispose();
} }
public virtual Task BootstrapCoreAsync() { public virtual Task BootstrapCoreAsync() {
_appLifetime.ApplicationStopping.Register(() => Server.Dispose()); _appLifetime.ApplicationStopping.Register(() => {
foreach (var item in Servers) {
item.Dispose();
}
});
return Task.FromResult(0); return Task.FromResult(0);
} }
} }
......
...@@ -5,7 +5,7 @@ using System.Threading.Tasks; ...@@ -5,7 +5,7 @@ using System.Threading.Tasks;
namespace Cap.Consistency namespace Cap.Consistency
{ {
public interface ITopicServer : IDisposable public interface IProcessingServer : IDisposable
{ {
void Start(); void Start();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment