Commit 004ed569 authored by Savorboard's avatar Savorboard

Refactoring kafka transport implementation for version 3.0

parent f37fd9ac
......@@ -3,6 +3,7 @@
using System;
using DotNetCore.CAP.Kafka;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
......@@ -23,10 +24,9 @@ namespace DotNetCore.CAP
services.Configure(_configure);
services.AddSingleton<ITransport, KafkaTransport>();
services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();
services.AddSingleton<ITransportPublisher, KafkaPublishMessageSender>();
services.AddSingleton<IPublishMessageSender, KafkaPublishMessageSender>();
services.AddSingleton<IConnectionPool,ConnectionPool>();
services.AddSingleton<IConnectionPool, ConnectionPool>();
}
}
}
\ No newline at end of file
......@@ -13,7 +13,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.1.0" />
<PackageReference Include="Confluent.Kafka" Version="1.2.2" />
</ItemGroup>
<ItemGroup>
......
......@@ -3,33 +3,33 @@
using System;
using System.Collections.Concurrent;
using System.Text.Json;
using System.Threading;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
namespace DotNetCore.CAP.Kafka
{
public class ConnectionPool : IConnectionPool, IDisposable
{
private readonly KafkaOptions _options;
private readonly ConcurrentQueue<IProducer<Null, string>> _producerPool;
private readonly ConcurrentQueue<IProducer<string, byte[]>> _producerPool;
private int _pCount;
private int _maxSize;
public ConnectionPool(ILogger<ConnectionPool> logger, IOptions<KafkaOptions> options)
{
_options = options.Value;
_producerPool = new ConcurrentQueue<IProducer<Null, string>>();
_producerPool = new ConcurrentQueue<IProducer<string, byte[]>>();
_maxSize = _options.ConnectionPoolSize;
logger.LogDebug("Kafka configuration of CAP :\r\n {0}", JsonConvert.SerializeObject(_options.AsKafkaConfig(), Formatting.Indented));
logger.LogDebug("Kafka configuration of CAP :\r\n {0}", JsonSerializer.Serialize(_options.AsKafkaConfig()));
}
public string ServersAddress => _options.Servers;
public IProducer<Null, string> RentProducer()
public IProducer<string, byte[]> RentProducer()
{
if (_producerPool.TryDequeue(out var producer))
{
......@@ -38,12 +38,12 @@ namespace DotNetCore.CAP.Kafka
return producer;
}
producer = new ProducerBuilder<Null, string>(_options.AsKafkaConfig()).Build();
producer = new ProducerBuilder<string, byte[]>(_options.AsKafkaConfig()).Build();
return producer;
}
public bool Return(IProducer<Null, string> producer)
public bool Return(IProducer<string, byte[]> producer)
{
if (Interlocked.Increment(ref _pCount) <= _maxSize)
{
......
......@@ -9,8 +9,8 @@ namespace DotNetCore.CAP.Kafka
{
string ServersAddress { get; }
IProducer<Null,string> RentProducer();
IProducer<string, byte[]> RentProducer();
bool Return(IProducer<Null, string> producer);
bool Return(IProducer<string, byte[]> producer);
}
}
\ No newline at end of file
......@@ -2,48 +2,53 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Kafka
{
internal class KafkaPublishMessageSender : BasePublishMessageSender
internal class KafkaTransport : ITransport
{
private readonly IConnectionPool _connectionPool;
private readonly ILogger _logger;
public KafkaPublishMessageSender(
ILogger<KafkaPublishMessageSender> logger,
IOptions<CapOptions> options,
IStorageConnection connection,
IConnectionPool connectionPool,
IStateChanger stateChanger)
: base(logger, options, connection, stateChanger)
public KafkaTransport(ILogger<KafkaTransport> logger, IConnectionPool connectionPool)
{
_logger = logger;
_connectionPool = connectionPool;
}
protected override string ServersAddress => _connectionPool.ServersAddress;
public string Address => _connectionPool.ServersAddress;
public override async Task<OperateResult> PublishAsync(string keyName, string content)
public async Task<OperateResult> SendAsync(TransportMessage message)
{
var producer = _connectionPool.RentProducer();
try
{
var result = await producer.ProduceAsync(keyName, new Message<Null, string>()
var headers = new Confluent.Kafka.Headers();
foreach (var header in message.Headers.Select(x => new Header(x.Key, Encoding.UTF8.GetBytes(x.Value))))
{
headers.Add(header);
}
var result = await producer.ProduceAsync(message.GetName(), new Message<string, byte[]>
{
Value = content
Headers = headers,
Key = message.GetId(),
Value = message.Body
});
if (result.Status == PersistenceStatus.Persisted || result.Status == PersistenceStatus.PossiblyPersisted)
{
_logger.LogDebug($"kafka topic message [{keyName}] has been published.");
_logger.LogDebug($"kafka topic message [{message.GetName()}] has been published.");
return OperateResult.Success;
}
......
......@@ -3,8 +3,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Confluent.Kafka;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Kafka
......@@ -15,7 +18,7 @@ namespace DotNetCore.CAP.Kafka
private readonly string _groupId;
private readonly KafkaOptions _kafkaOptions;
private IConsumer<Null, string> _consumerClient;
private IConsumer<string, byte[]> _consumerClient;
public KafkaConsumerClient(string groupId, IOptions<KafkaOptions> options)
{
......@@ -23,7 +26,7 @@ namespace DotNetCore.CAP.Kafka
_kafkaOptions = options.Value ?? throw new ArgumentNullException(nameof(options));
}
public event EventHandler<MessageContext> OnMessageReceived;
public event EventHandler<TransportMessage> OnMessageReceived;
public event EventHandler<LogMessageEventArgs> OnLog;
......@@ -51,12 +54,8 @@ namespace DotNetCore.CAP.Kafka
if (consumerResult.IsPartitionEOF || consumerResult.Value == null) continue;
var message = new MessageContext
{
Group = _groupId,
Name = consumerResult.Topic,
Content = consumerResult.Value
};
var header = consumerResult.Headers.ToDictionary(x => x.Key, y => Encoding.UTF8.GetString(y.GetValueBytes()));
var message = new TransportMessage(header, consumerResult.Value);
OnMessageReceived?.Invoke(consumerResult, message);
}
......@@ -97,7 +96,7 @@ namespace DotNetCore.CAP.Kafka
_kafkaOptions.MainConfig["auto.offset.reset"] = "earliest";
var config = _kafkaOptions.AsKafkaConfig();
_consumerClient = new ConsumerBuilder<Null, string>(config)
_consumerClient = new ConsumerBuilder<string, byte[]>(config)
.SetErrorHandler(ConsumerClient_OnConsumeError)
.Build();
}
......@@ -108,7 +107,7 @@ namespace DotNetCore.CAP.Kafka
}
}
private void ConsumerClient_OnConsumeError(IConsumer<Null, string> consumer, Error e)
private void ConsumerClient_OnConsumeError(IConsumer<string, byte[]> consumer, Error e)
{
var logArgs = new LogMessageEventArgs
{
......
......@@ -3,7 +3,7 @@
public static class Headers
{
/// <summary>
/// Id of the message. Either set the ID explicitly when sending a message, or Rebus will assign one to the message.
/// Id of the message. Either set the ID explicitly when sending a message, or assign one to the message.
/// </summary>
public const string MessageId = "cap-msg-id";
......
......@@ -24,6 +24,11 @@ namespace DotNetCore.CAP.Messages
/// </summary>
public byte[] Body { get; }
public string GetId()
{
return Headers.TryGetValue(Messages.Headers.MessageId, out var value) ? value : null;
}
public string GetName()
{
return Headers.TryGetValue(Messages.Headers.MessageName, out var value) ? value : null;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment