Commit 696cf7e1 authored by yangxiaodong's avatar yangxiaodong

refactor

parent fc967b69
......@@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Text;
namespace Cap.Consistency.Attributes
namespace Cap.Consistency.Abstractions
{
[AttributeUsage(AttributeTargets.Method | AttributeTargets.Class, Inherited = true, AllowMultiple = true)]
......@@ -18,8 +18,6 @@ namespace Cap.Consistency.Attributes
get { return _name; }
}
public bool IsOneWay { get; set; }
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Cap.Consistency.Attributes
{
public class KafkaTopicAttribute : TopicAttribute
{
public KafkaTopicAttribute(string topicName)
: this(topicName, 0) { }
public KafkaTopicAttribute(string topicName, int partition)
: this(topicName, partition, 0) { }
public KafkaTopicAttribute(string topicName, int partition, long offset)
: base(topicName) {
Offset = offset;
Partition = partition;
}
public int Partition { get; }
public long Offset { get; }
public bool IsPartition { get { return Partition == 0; } }
public bool IsOffset { get { return Offset == 0; } }
public override string ToString() {
return Name;
}
}
}
......@@ -2,17 +2,12 @@
<PropertyGroup>
<TargetFramework>netstandard1.6</TargetFramework>
<AssemblyName>Cap.Consistency</AssemblyName>
<PackageId>Cap.Consistency</PackageId>
<NetStandardImplicitPackageVersion>1.6.1</NetStandardImplicitPackageVersion>
<PackageTargetFallback>$(PackageTargetFallback);dnxcore50</PackageTargetFallback>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="0.9.5" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Options" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="1.1.1" />
......
using System;
using System.Collections.Generic;
using System.Text;
using System.Linq;
using System.Threading.Tasks;
using Cap.Consistency.Abstractions;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Routing;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Cap.Consistency.Consumer.Kafka;
using Cap.Consistency.Internal;
namespace Cap.Consistency.Consumer
......@@ -20,6 +16,7 @@ namespace Cap.Consistency.Consumer
{
private readonly IServiceProvider _serviceProvider;
private readonly IConsumerInvokerFactory _consumerInvokerFactory;
private readonly IConsumerClientFactory _consumerClientFactory;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
private readonly MethodMatcherCache _selector;
......@@ -31,6 +28,7 @@ namespace Cap.Consistency.Consumer
public ConsumerHandler(
IServiceProvider serviceProvider,
IConsumerInvokerFactory consumerInvokerFactory,
IConsumerClientFactory consumerClientFactory,
ILoggerFactory loggerFactory,
ConsistencyMessageManager<T> messageManager,
MethodMatcherCache selector,
......@@ -41,6 +39,7 @@ namespace Cap.Consistency.Consumer
_loggerFactory = loggerFactory;
_serviceProvider = serviceProvider;
_consumerInvokerFactory = consumerInvokerFactory;
_consumerClientFactory = consumerClientFactory;
_options = options.Value;
_messageManager = messageManager;
}
......@@ -63,7 +62,7 @@ namespace Cap.Consistency.Consumer
var groupingMatchs = matchs.GroupBy(x => x.Value.GroupId);
foreach (var matchGroup in groupingMatchs) {
using (var client = new KafkaConsumerClient(matchGroup.Key, _options.BrokerUrlList)) {
using (var client = _consumerClientFactory.Create(matchGroup.Key, _options.BrokerUrlList)) {
client.MessageReceieved += OnMessageReceieved;
foreach (var item in matchGroup) {
......
using System;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Infrastructure;
namespace Cap.Consistency.Consumer
{
public interface IConsumerClient
public interface IConsumerClient : IDisposable
{
void Subscribe(string topic);
void Subscribe(string topic, int partition);
void Listening(TimeSpan timeout);
event EventHandler<DeliverMessage> MessageReceieved;
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Cap.Consistency.Consumer
{
public interface IConsumerClientFactory
{
IConsumerClient Create(string groupId, string clientHostAddress);
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Cap.Consistency.Infrastructure;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
namespace Cap.Consistency.Consumer.Kafka
{
public class KafkaTopicPartition
{
public string Topic { get; set; }
public int Partition { get; set; }
}
public class KafkaConsumerClient : IDisposable
{
private readonly string _groupId;
private readonly string _bootstrapServers;
private Consumer<Null, string> _consumerClient;
public event EventHandler<DeliverMessage> MessageReceieved;
public IDeserializer<string> StringDeserializer { get; set; }
public KafkaConsumerClient(string groupId, string bootstrapServers) {
_groupId = groupId;
_bootstrapServers = bootstrapServers;
StringDeserializer = new StringDeserializer(Encoding.UTF8);
}
public void Subscribe(string topicName, int partition) {
if (_consumerClient == null) {
InitKafkaClient();
}
_consumerClient.Assignment.Add(new TopicPartition(topicName, partition));
}
public void Subscribe(IEnumerable<KafkaTopicPartition> topicList) {
if (_consumerClient == null) {
InitKafkaClient();
}
if (topicList == null || topicList.Count() == 0) {
throw new ArgumentNullException(nameof(topicList));
}
foreach (var item in topicList) {
Subscribe(item.Topic, item.Partition);
}
}
public void Listening(TimeSpan timeout) {
while (true) {
_consumerClient.Poll(timeout);
}
}
public void Dispose() {
_consumerClient.Dispose();
}
#region private methods
private void InitKafkaClient() {
var config = new Dictionary<string, object>{
{ "group.id", "simple-csharp-consumer" },
{ "bootstrap.servers", _bootstrapServers }
};
_consumerClient = new Consumer<Null, string>(config, null, StringDeserializer);
_consumerClient.OnMessage += ConsumerClient_OnMessage;
}
private void ConsumerClient_OnMessage(object sender, Message<Null, string> e) {
var message = new DeliverMessage {
MessageKey = e.Topic,
Value = e.Value
};
MessageReceieved?.Invoke(sender, message);
}
#endregion
//public void
}
}
//using System;
//using System.Collections.Generic;
//using System.Text;
//using System.Linq;
//using System.Threading.Tasks;
//using Cap.Consistency.Abstractions;
//using Cap.Consistency.Infrastructure;
//using Cap.Consistency.Routing;
//using Confluent.Kafka;
//using Confluent.Kafka.Serialization;
//using Microsoft.AspNetCore.Builder;
//using Microsoft.Extensions.Logging;
//using Microsoft.Extensions.Options;
//namespace Cap.Consistency.Consumer.Kafka
//{
// public class KafkaConsumerHandler<T> : ConsumerHandler<T> where T : ConsistencyMessage, new()
// {
// protected override void OnMessageReceieved(T message) {
// }
// public Task RouteAsync(TopicRouteContext context) {
// if (context == null) {
// throw new ArgumentNullException(nameof(context));
// }
// context.ServiceProvider = _serviceProvider;
// var matchs = _selector.SelectCandidates(context);
// var config = new Dictionary<string, object>
// {
// { "group.id", "simple-csharp-consumer" },
// { "bootstrap.servers", _options.BrokerUrlList }
// };
// using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8))) {
// var topicList = matchs.Select(item => new TopicPartitionOffset(item.Topic.Name, item.Topic.Partition, new Offset(item.Topic.Offset)));
// consumer.Assign(topicList);
// while (true) {
// if (consumer.Consume(out Message<Null, string> msg)) {
// T consistencyMessage = new T();
// var message = new DeliverMessage() {
// MessageKey = msg.Topic,
// Body = Encoding.UTF8.GetBytes(msg.Value)
// };
// var routeContext = new TopicRouteContext(message);
// var executeDescriptor = _selector.SelectBestCandidate(routeContext, matchs);
// if (executeDescriptor == null) {
// _logger.LogInformation("can not be fond topic execute");
// return Task.CompletedTask;
// }
// var consumerContext = new ConsumerContext(executeDescriptor, message);
// var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext);
// _logger.LogInformation("consumer starting");
// return invoker.InvokeAsync();
// }
// }
// }
// }
// }
//}
......@@ -4,7 +4,6 @@ using System.Linq;
using System.Reflection;
using System.Text;
using Cap.Consistency.Abstractions;
using Cap.Consistency.Attributes;
using Cap.Consistency.Consumer;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Routing;
......
using System.Reflection;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("ouraspnet")]
[assembly: AssemblyProduct("Cap.Consistence")]
[assembly: AssemblyTrademark("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("e8af8611-0ea4-4b19-bc48-87c57a87dc66")]
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment