Commit 444de4e9 authored by yangxiaodong's avatar yangxiaodong

--no commit message

--no commit message
parent a614ab1d
using System;
using System.Collections.Generic;
using System.Reflection;
using Cap.Consistency.Server.Internal.Infrastructure;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RdKafka;
using System.Text;
namespace Cap.Consistency.Server
{
public class ConsistencyServer : IServer
public class ConsistencyServer : IConsistencyServer
{
private Stack<IDisposable> _disposables;
private readonly IApplicationLifetime _applicationLifetime;
private readonly ILogger _logger;
private readonly IConsumer _consumer;
public ConsistencyServer(IOptions<ConsistencyServerOptions> options, IApplicationLifetime applicationLifetime,
ILoggerFactory loggerFactory)
public ConsistencyServer(IOptions<ConsistencyServerOptions> options, ILoggerFactory loggerFactory)
{
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}
if (applicationLifetime == null)
{
throw new ArgumentNullException(nameof(applicationLifetime));
}
if (loggerFactory == null)
{
throw new ArgumentNullException(nameof(loggerFactory));
}
Options = options.Value ?? new ConsistencyServerOptions();
_applicationLifetime = applicationLifetime;
_logger = loggerFactory.CreateLogger(typeof(ConsistencyServer).GetTypeInfo().Namespace);
_consumer = Options.ApplicationServices.GetService<IConsumer>();
}
public ConsistencyServerOptions Options { get; }
public IFeatureCollection Features { get; set; }
public void Start<TContext>(IHttpApplication<TContext> application)
{
if (_disposables != null)
{
// The server has already started and/or has not been cleaned up yet
throw new InvalidOperationException("Server has already started.");
}
_disposables = new Stack<IDisposable>();
var trace = new ConsistencyTrace(_logger);
public ConsistencyServerOptions Options { get; }
_consumer.Log = trace;
public void Run() {
//配置消费者组
var config = new Config() { GroupId = "example-csharp-consumer" };
using (var consumer = new EventConsumer(config, "127.0.0.1:9092")) {
_disposables.Push(_consumer);
var threadCount = Options.ThreadCount;
//注册一个事件
consumer.OnMessage += (obj, msg) =>
{
string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");
};
if (threadCount <= 0)
{
throw new ArgumentOutOfRangeException(nameof(threadCount),
threadCount,
"ThreadCount must be positive.");
}
//订阅一个或者多个Topic
consumer.Subscribe(new[] { "testtopic" });
try
{
_consumer.Start(threadCount);
}
catch (Exception ex)
{
throw ex;
}
}
//启动
consumer.Start();
public void Dispose()
{
if (_disposables != null)
{
while (_disposables.Count > 0)
{
_disposables.Pop().Dispose();
}
_disposables = null;
_logger.LogInformation("Started consumer...");
}
}
}
......
namespace Cap.Consistency.Server
{
public interface IConsistencyServer
{
ConsistencyServerOptions Options { get; }
void Run();
}
}
\ No newline at end of file
......@@ -7,7 +7,8 @@
"Microsoft.AspNetCore.Hosting": "1.1.0-*",
"Microsoft.Extensions.Logging.Abstractions": "1.1.0-*",
"Microsoft.Extensions.Options": "1.1.0",
"NETStandard.Library": "1.6.1"
"NETStandard.Library": "1.6.1",
"RdKafka": "0.9.2-ci-189"
},
"frameworks": {
"netstandard1.6": {
......
......@@ -2,5 +2,7 @@
{
public class BrokerOptions
{
public string HostName { get; set; }
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment