Commit 98cd5861 authored by yangxiaodong's avatar yangxiaodong

Refactor KafkaOptions

parent f184550a
......@@ -35,7 +35,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore
public async Task<OperateResult> StoreSentMessageAsync(CapSentMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
Context.Add(message);
await Context.SaveChangesAsync();
return OperateResult.Success;
......
......@@ -18,7 +18,6 @@ namespace Microsoft.Extensions.DependencyInjection
/// <returns>An <see cref="CapBuilder"/> for creating and configuring the CAP system.</returns>
public static CapBuilder AddKafka(this CapBuilder builder, Action<KafkaOptions> setupAction)
{
if (setupAction == null) throw new ArgumentNullException(nameof(setupAction));
builder.Services.Configure(setupAction);
......
using System;
using System.Collections.Generic;
using System.Text;
using System.Linq;
namespace DotNetCore.CAP.Kafka
{
/// <summary>
/// Provides programmatic configuration for the CAP kafka project.
/// </summary>
public class KafkaOptions
{
/// <summary>
/// Gets the Kafka broker id.
/// </summary>
public int BrokerId { get; }
public KafkaOptions()
{
MainConfig = new Dictionary<string, object>();
}
/// <summary>
/// Gets the Kafka broker hostname.
/// librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
/// <para>
/// Topic configuration parameters are specified via the "default.topic.config" sub-dictionary config parameter.
/// </para>
/// </summary>
public string Host { get; }
public IDictionary<string, object> MainConfig { get; private set; }
/// <summary>
/// Gets the Kafka broker port.
/// The `bootstrap.servers` item config of `MainConfig`.
/// <para>
/// Initial list of brokers as a CSV list of broker host or host:port.
/// </para>
/// </summary>
public int Port { get; }
public string Servers { get; set; }
/// <summary>
/// Returns a JSON representation of the BrokerMetadata object.
/// </summary>
/// <returns>
/// A JSON representation of the BrokerMetadata object.
/// </returns>
public override string ToString()
=> $"{{ \"BrokerId\": {BrokerId}, \"Host\": \"{Host}\", \"Port\": {Port} }}";
internal IEnumerable<KeyValuePair<string, object>> AsRdkafkaConfig()
{
if (!MainConfig.ContainsKey("bootstrap.servers"))
{
if (string.IsNullOrEmpty(Servers))
{
throw new ArgumentNullException(nameof(Servers));
}
else
{
MainConfig.Add("bootstrap.servers", Servers);
}
}
return MainConfig.AsEnumerable();
}
}
}
}
\ No newline at end of file
......@@ -123,7 +123,7 @@ namespace DotNetCore.CAP.Kafka
{
try
{
var config = new Dictionary<string, object> { { "bootstrap.servers", _kafkaOptions.Host } };
var config = _kafkaOptions.AsRdkafkaConfig();
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{
var message = producer.ProduceAsync(topic, null, content).Result;
......
using System;
using System.Collections.Generic;
using System.Text;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
......@@ -56,12 +55,11 @@ namespace DotNetCore.CAP.Kafka
private void InitKafkaClient()
{
var config = new Dictionary<string, object>{
{ "group.id", _groupId },
{ "bootstrap.servers", _kafkaOptions.Host }
};
_kafkaOptions.MainConfig.Add("group.id", _groupId);
var config = _kafkaOptions.AsRdkafkaConfig();
_consumerClient = new Consumer<Null, string>(config, null, StringDeserializer);
_consumerClient.OnMessage += ConsumerClient_OnMessage;
}
......
using System;
using System.Collections.Generic;
using System.Text;
namespace DotNetCore.CAP.RabbitMQ
namespace DotNetCore.CAP.RabbitMQ
{
public class RabbitMQOptions
{
......@@ -68,4 +64,4 @@ namespace DotNetCore.CAP.RabbitMQ
/// </summary>
public int Port { get; set; } = -1;
}
}
}
\ No newline at end of file
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.RabbitMQ
{
......
......@@ -7,7 +7,7 @@ namespace DotNetCore.CAP.Infrastructure
/// </summary>
public class CapOptions
{
/// <summary>
/// <summary>
/// Corn expression for configuring retry cron job. Default is 1 min.
/// </summary>
public string CronExp { get; set; } = Cron.Minutely();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment