Commit 75de50d4 authored by Savorboard's avatar Savorboard Committed by GitHub

Merge pull request #19 from alexinea/master

thanks alexinea!
parents 926769d3 4c5da09d
...@@ -21,32 +21,34 @@ namespace DotNetCore.CAP ...@@ -21,32 +21,34 @@ namespace DotNetCore.CAP
/// Topic configuration parameters are specified via the "default.topic.config" sub-dictionary config parameter. /// Topic configuration parameters are specified via the "default.topic.config" sub-dictionary config parameter.
/// </para> /// </para>
/// </summary> /// </summary>
public IDictionary<string, object> MainConfig { get; private set; } public readonly IDictionary<string, object> MainConfig;
/// <summary> /// <summary>
/// The `bootstrap.servers` item config of `MainConfig`. /// The `bootstrap.servers` item config of <see cref="MainConfig"/>.
/// <para> /// <para>
/// Initial list of brokers as a CSV list of broker host or host:port. /// Initial list of brokers as a CSV list of broker host or host:port.
/// </para> /// </para>
/// </summary> /// </summary>
public string Servers { get; set; } public string Servers { get; set; }
internal IEnumerable<KeyValuePair<string, object>> AsRdkafkaConfig() internal IEnumerable<KeyValuePair<string, object>> AskafkaConfig()
{ {
if (MainConfig.ContainsKey("bootstrap.servers")) if (MainConfig.ContainsKey("bootstrap.servers"))
{
return MainConfig.AsEnumerable(); return MainConfig.AsEnumerable();
}
if (string.IsNullOrEmpty(Servers)) if (string.IsNullOrWhiteSpace(Servers))
{ {
throw new ArgumentNullException(nameof(Servers)); throw new ArgumentNullException(nameof(Servers));
} }
else
{
MainConfig.Add("bootstrap.servers", Servers); MainConfig.Add("bootstrap.servers", Servers);
}
MainConfig["queue.buffering.max.ms"] = "10"; MainConfig["queue.buffering.max.ms"] = "10";
MainConfig["socket.blocking.max.ms"] = "10"; MainConfig["socket.blocking.max.ms"] = "10";
MainConfig["enable.auto.commit"] = "false"; MainConfig["enable.auto.commit"] = "false";
return MainConfig.AsEnumerable(); return MainConfig.AsEnumerable();
} }
} }
......
...@@ -21,7 +21,7 @@ namespace DotNetCore.CAP.Kafka ...@@ -21,7 +21,7 @@ namespace DotNetCore.CAP.Kafka
public KafkaConsumerClient(string groupId, KafkaOptions options) public KafkaConsumerClient(string groupId, KafkaOptions options)
{ {
_groupId = groupId; _groupId = groupId;
_kafkaOptions = options; _kafkaOptions = options ?? throw new ArgumentNullException(nameof(options));
StringDeserializer = new StringDeserializer(Encoding.UTF8); StringDeserializer = new StringDeserializer(Encoding.UTF8);
} }
...@@ -65,7 +65,7 @@ namespace DotNetCore.CAP.Kafka ...@@ -65,7 +65,7 @@ namespace DotNetCore.CAP.Kafka
{ {
_kafkaOptions.MainConfig.Add("group.id", _groupId); _kafkaOptions.MainConfig.Add("group.id", _groupId);
var config = _kafkaOptions.AsRdkafkaConfig(); var config = _kafkaOptions.AskafkaConfig();
_consumerClient = new Consumer<Null, string>(config, null, StringDeserializer); _consumerClient = new Consumer<Null, string>(config, null, StringDeserializer);
_consumerClient.OnMessage += ConsumerClient_OnMessage; _consumerClient.OnMessage += ConsumerClient_OnMessage;
...@@ -80,6 +80,7 @@ namespace DotNetCore.CAP.Kafka ...@@ -80,6 +80,7 @@ namespace DotNetCore.CAP.Kafka
Name = e.Topic, Name = e.Topic,
Content = e.Value Content = e.Value
}; };
OnMessageReceieved?.Invoke(sender, message); OnMessageReceieved?.Invoke(sender, message);
} }
......
using Microsoft.Extensions.Options; using System;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Kafka namespace DotNetCore.CAP.Kafka
{ {
...@@ -8,7 +9,7 @@ namespace DotNetCore.CAP.Kafka ...@@ -8,7 +9,7 @@ namespace DotNetCore.CAP.Kafka
public KafkaConsumerClientFactory(IOptions<KafkaOptions> kafkaOptions) public KafkaConsumerClientFactory(IOptions<KafkaOptions> kafkaOptions)
{ {
_kafkaOptions = kafkaOptions.Value; _kafkaOptions = kafkaOptions?.Value ?? throw new ArgumentNullException(nameof(kafkaOptions));
} }
public IConsumerClient Create(string groupId) public IConsumerClient Create(string groupId)
......
...@@ -26,7 +26,7 @@ namespace DotNetCore.CAP.Kafka ...@@ -26,7 +26,7 @@ namespace DotNetCore.CAP.Kafka
{ {
try try
{ {
var config = _kafkaOptions.AsRdkafkaConfig(); var config = _kafkaOptions.AskafkaConfig();
var contentBytes = Encoding.UTF8.GetBytes(content); var contentBytes = Encoding.UTF8.GetBytes(content);
using (var producer = new Producer(config)) using (var producer = new Producer(config))
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment