Commit 03392c00 authored by yangxiaodong's avatar yangxiaodong

update kafka proejct

parent de1715b7
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
<PropertyGroup> <PropertyGroup>
<TargetFramework>netstandard1.6</TargetFramework> <TargetFramework>netstandard1.6</TargetFramework>
<NetStandardImplicitPackageVersion>1.6.1</NetStandardImplicitPackageVersion>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
......
...@@ -38,6 +38,7 @@ namespace Cap.Consistency.Kafka ...@@ -38,6 +38,7 @@ namespace Cap.Consistency.Kafka
InitKafkaClient(); InitKafkaClient();
} }
_consumerClient.Assignment.Add(new TopicPartition(topicName, partition)); _consumerClient.Assignment.Add(new TopicPartition(topicName, partition));
_consumerClient.Subscribe(topicName);
} }
public void Listening(TimeSpan timeout) { public void Listening(TimeSpan timeout) {
...@@ -65,11 +66,12 @@ namespace Cap.Consistency.Kafka ...@@ -65,11 +66,12 @@ namespace Cap.Consistency.Kafka
private void ConsumerClient_OnMessage(object sender, Message<Null, string> e) { private void ConsumerClient_OnMessage(object sender, Message<Null, string> e) {
var message = new DeliverMessage { var message = new DeliverMessage {
MessageKey = e.Topic, MessageKey = e.Topic,
Value = e.Value Value = e.Value,
Body = Encoding.UTF8.GetBytes(e.Value)
}; };
MessageReceieved?.Invoke(sender, message); MessageReceieved?.Invoke(sender, message);
} }
#endregion #endregion
} }
......
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Producer;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Cap.Consistency.Kafka namespace Cap.Consistency.Kafka
{ {
public class KafkaProducerClient public class KafkaProducerClient : IProducerClient
{ {
private readonly ConsistencyOptions _options;
private readonly ILogger _logger;
public KafkaProducerClient(IOptions<ConsistencyOptions> options, ILoggerFactory loggerFactory) {
_options = options.Value;
_logger = loggerFactory.CreateLogger(nameof(KafkaProducerClient));
}
public Task SendAsync(string topic, string content) {
var config = new Dictionary<string, object> { { "bootstrap.servers", _options.BrokerUrlList } };
try {
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8))) {
var task = producer.ProduceAsync(topic, null, content);
task.ContinueWith(g => {
_logger.LogInformation($"publish message to topic:{topic}\r\n -------content:{content}\r\n ");
});
//producer.Flush(1000);
return Task.CompletedTask;
}
}
catch (Exception e) {
_logger.LogError(new EventId(1), e, $"publish message to [topic:{topic}] ,content:{content}");
return Task.CompletedTask;
}
}
} }
} }
...@@ -7,7 +7,7 @@ namespace Cap.Consistency.Kafka ...@@ -7,7 +7,7 @@ namespace Cap.Consistency.Kafka
{ {
public class KafkaTopicAttribute : TopicAttribute public class KafkaTopicAttribute : TopicAttribute
{ {
public KafkaTopicAttribute(string topicName) public KafkaTopicAttribute(string topicName)
: this(topicName, 0) { } : this(topicName, 0) { }
public KafkaTopicAttribute(string topicName, int partition) public KafkaTopicAttribute(string topicName, int partition)
......
using System;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency;
using Cap.Consistency.Consumer;
using Cap.Consistency.Kafka;
using Cap.Consistency.Producer;
namespace Microsoft.Extensions.DependencyInjection
{
public static class ConsistencyBuilderExtensions
{
public static ConsistencyBuilder AddKafka(this ConsistencyBuilder builder) {
builder.Services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();
builder.Services.AddTransient<IProducerClient, KafkaProducerClient>();
return builder;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment