Commit fc967b69 authored by yangxiaodong's avatar yangxiaodong

add kafka extend project.

parent 33f5c903
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard1.6</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="0.9.5" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Cap.Consistency\Cap.Consistency.csproj" />
</ItemGroup>
</Project>
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Cap.Consistency.Consumer;
using Cap.Consistency.Infrastructure;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
namespace Cap.Consistency.Kafka
{
public class KafkaTopicPartition
{
public string Topic { get; set; }
public int Partition { get; set; }
}
public class KafkaConsumerClient : IConsumerClient, IDisposable
{
private readonly string _groupId;
private readonly string _bootstrapServers;
private Consumer<Null, string> _consumerClient;
public event EventHandler<DeliverMessage> MessageReceieved;
public IDeserializer<string> StringDeserializer { get; set; }
public KafkaConsumerClient(string groupId, string bootstrapServers) {
_groupId = groupId;
_bootstrapServers = bootstrapServers;
StringDeserializer = new StringDeserializer(Encoding.UTF8);
}
public void Subscribe(string topic) {
Subscribe(topic, 0);
}
public void Subscribe(string topicName, int partition) {
if (_consumerClient == null) {
InitKafkaClient();
}
_consumerClient.Assignment.Add(new TopicPartition(topicName, partition));
}
public void Subscribe(IEnumerable<KafkaTopicPartition> topicList) {
if (_consumerClient == null) {
InitKafkaClient();
}
if (topicList == null || topicList.Count() == 0) {
throw new ArgumentNullException(nameof(topicList));
}
foreach (var item in topicList) {
Subscribe(item.Topic, item.Partition);
}
}
public void Listening(TimeSpan timeout) {
while (true) {
_consumerClient.Poll(timeout);
}
}
public void Dispose() {
_consumerClient.Dispose();
}
#region private methods
private void InitKafkaClient() {
var config = new Dictionary<string, object>{
{ "group.id", _groupId },
{ "bootstrap.servers", _bootstrapServers }
};
_consumerClient = new Consumer<Null, string>(config, null, StringDeserializer);
_consumerClient.OnMessage += ConsumerClient_OnMessage;
}
private void ConsumerClient_OnMessage(object sender, Message<Null, string> e) {
var message = new DeliverMessage {
MessageKey = e.Topic,
Value = e.Value
};
MessageReceieved?.Invoke(sender, message);
}
#endregion
//public void
}
}
using System;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Consumer;
namespace Cap.Consistency.Kafka
{
public class KafkaConsumerClientFactory : IConsumerClientFactory
{
public IConsumerClient Create(string groupId, string clientHostAddress) {
return new KafkaConsumerClient(groupId, clientHostAddress);
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Abstractions;
namespace Cap.Consistency.Kafka
{
public class KafkaTopicAttribute : TopicAttribute
{
public KafkaTopicAttribute(string topicName)
: this(topicName, 0) { }
public KafkaTopicAttribute(string topicName, int partition)
: this(topicName, partition, 0) { }
public KafkaTopicAttribute(string topicName, int partition, long offset)
: base(topicName) {
Offset = offset;
Partition = partition;
}
public int Partition { get; }
public long Offset { get; }
public bool IsPartition { get { return Partition == 0; } }
public bool IsOffset { get { return Offset == 0; } }
public override string ToString() {
return Name;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment