Commit 966f605b authored by yangxiaodong's avatar yangxiaodong

refactor

parent a060be4f
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Cap.Consistency.Abstractions; using Cap.Consistency.Abstractions;
using Cap.Consistency.Infrastructure; using Cap.Consistency.Infrastructure;
using Cap.Consistency.Routing; using Cap.Consistency.Routing;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Cap.Consistency.Consumer.Kafka;
using Cap.Consistency.Internal;
namespace Cap.Consistency.Consumer namespace Cap.Consistency.Consumer
{ {
public class ConsumerHandler : IConsumerHandler public class ConsumerHandler<T> : IConsumerHandler<T> where T : ConsistencyMessage, new()
{ {
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private readonly IConsumerInvokerFactory _consumerInvokerFactory; private readonly IConsumerInvokerFactory _consumerInvokerFactory;
private readonly IConsumerExcutorSelector _selector;
private readonly ILoggerFactory _loggerFactory; private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly MethodMatcherCache _selector;
private readonly ConsistencyOptions _options;
private readonly ConsistencyMessageManager<T> _messageManager;
public event EventHandler<T> MessageReceieved;
public ConsumerHandler( public ConsumerHandler(
IServiceProvider serviceProvider, IServiceProvider serviceProvider,
IConsumerInvokerFactory consumerInvokerFactory, IConsumerInvokerFactory consumerInvokerFactory,
IConsumerExcutorSelector selector, ILoggerFactory loggerFactory,
ILoggerFactory loggerFactory) { ConsistencyMessageManager<T> messageManager,
MethodMatcherCache selector,
IOptions<ConsistencyOptions> options) {
_selector = selector; _selector = selector;
_logger = loggerFactory.CreateLogger<ConsumerHandler>(); _logger = loggerFactory.CreateLogger<ConsumerHandler<T>>();
_loggerFactory = loggerFactory; _loggerFactory = loggerFactory;
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
_consumerInvokerFactory = consumerInvokerFactory; _consumerInvokerFactory = consumerInvokerFactory;
_options = options.Value;
_messageManager = messageManager;
}
protected virtual void OnMessageReceieved(T message) {
MessageReceieved?.Invoke(this, message);
} }
public Task RouteAsync(TopicRouteContext context) { public Task RouteAsync(TopicRouteContext context) {
...@@ -40,47 +58,52 @@ namespace Cap.Consistency.Consumer ...@@ -40,47 +58,52 @@ namespace Cap.Consistency.Consumer
context.ServiceProvider = _serviceProvider; context.ServiceProvider = _serviceProvider;
var matchs = _selector.SelectCandidates(context); var matchs = _selector.GetCandidatesMethods(context);
var groupingMatchs = matchs.GroupBy(x => x.Value.GroupId);
var config = new Dictionary<string, object> foreach (var matchGroup in groupingMatchs) {
{ using (var client = new KafkaConsumerClient(matchGroup.Key, _options.BrokerUrlList)) {
{ "group.id", "simple-csharp-consumer" }, client.MessageReceieved += OnMessageReceieved;
{ "bootstrap.servers", brokerList }
};
using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8))) {
//consumer.Assign(new List<TopicInfo> { new TopicInfo(topics.First(), 0, 0) });
while (true) { foreach (var item in matchGroup) {
Message<Null, string> msg; client.Subscribe(item.Key, item.Value.Topic.Partition);
if (consumer.Consume(out msg)) {
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
} }
client.Listening(TimeSpan.Zero);
} }
} }
return Task.CompletedTask;
}
private void OnMessageReceieved(object sender, DeliverMessage message) {
T consistencyMessage = new T() {
Id = message.MessageKey,
Payload = Encoding.UTF8.GetString(message.Body)
};
_logger.LogInformation("message receieved message topic name: " + consistencyMessage.Id);
if (matchs == null || matchs.Count == 0) { _messageManager.CreateAsync(consistencyMessage);
_logger.LogInformation("can not be fond topic route");
return Task.CompletedTask;
}
var executeDescriptor = _selector.SelectBestCandidate(context, matchs); try {
var executeDescriptor = _selector.GetTopicExector(message.MessageKey);
context.Handler = c => { var consumerContext = new ConsumerContext(executeDescriptor, message);
var consumerContext = new ConsumerContext(executeDescriptor);
var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext); var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext);
_logger.LogInformation("consumer starting"); invoker.InvokeAsync();
return invoker.InvokeAsync(); _messageManager.UpdateAsync(consistencyMessage);
};
}
return Task.CompletedTask; catch (Exception ex) {
_logger.LogError("exception raised when excute method : " + ex.Message);
throw ex;
}
} }
} }
} }
using System;
using System.Collections.Generic;
using System.Text;
namespace Cap.Consistency.Consumer
{
public interface IConsumerClient
{
}
}
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Routing; using Cap.Consistency.Routing;
namespace Cap.Consistency.Consumer namespace Cap.Consistency.Consumer
{ {
public interface IConsumerHandler : ITopicRoute public interface IConsumerHandler<T> : ITopicRoute where T : ConsistencyMessage, new()
{ {
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment