Commit ab8a38a6 authored by Savorboard's avatar Savorboard

Compatible with confluent-kafka-dotnet version 1.0

parent c7a941c1
...@@ -20,14 +20,14 @@ namespace DotNetCore.CAP ...@@ -20,14 +20,14 @@ namespace DotNetCore.CAP
/// Topic configuration parameters are specified via the "default.topic.config" sub-dictionary config parameter. /// Topic configuration parameters are specified via the "default.topic.config" sub-dictionary config parameter.
/// </para> /// </para>
/// </summary> /// </summary>
public readonly ConcurrentDictionary<string, object> MainConfig; public readonly ConcurrentDictionary<string, string> MainConfig;
private IEnumerable<KeyValuePair<string, object>> _kafkaConfig; private IEnumerable<KeyValuePair<string, string>> _kafkaConfig;
public KafkaOptions() public KafkaOptions()
{ {
MainConfig = new ConcurrentDictionary<string, object>(); MainConfig = new ConcurrentDictionary<string, string>();
} }
/// <summary> /// <summary>
...@@ -43,7 +43,7 @@ namespace DotNetCore.CAP ...@@ -43,7 +43,7 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
public string Servers { get; set; } public string Servers { get; set; }
internal IEnumerable<KeyValuePair<string, object>> AsKafkaConfig() internal IEnumerable<KeyValuePair<string, string>> AsKafkaConfig()
{ {
if (_kafkaConfig == null) if (_kafkaConfig == null)
{ {
......
...@@ -3,7 +3,6 @@ ...@@ -3,7 +3,6 @@
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading; using System.Threading;
using Confluent.Kafka; using Confluent.Kafka;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
...@@ -13,81 +12,61 @@ namespace DotNetCore.CAP.Kafka ...@@ -13,81 +12,61 @@ namespace DotNetCore.CAP.Kafka
{ {
public class ConnectionPool : IConnectionPool, IDisposable public class ConnectionPool : IConnectionPool, IDisposable
{ {
private readonly ILogger<ConnectionPool> _logger; private readonly KafkaOptions _options;
private readonly Func<Producer> _activator; private readonly ConcurrentQueue<IProducer<Null, string>> _producerPool;
private readonly ConcurrentQueue<Producer> _pool; private int _pCount;
private int _count;
private int _maxSize; private int _maxSize;
public ConnectionPool(ILogger<ConnectionPool> logger, KafkaOptions options) public ConnectionPool(ILogger<ConnectionPool> logger, KafkaOptions options)
{ {
_logger = logger;
_pool = new ConcurrentQueue<Producer>();
_maxSize = options.ConnectionPoolSize;
_activator = CreateActivator(options);
ServersAddress = options.Servers; ServersAddress = options.Servers;
_logger.LogDebug("Kafka configuration of CAP :\r\n {0}", _options = options;
JsonConvert.SerializeObject(options.AsKafkaConfig(), Formatting.Indented)); _producerPool = new ConcurrentQueue<IProducer<Null, string>>();
_maxSize = options.ConnectionPoolSize;
logger.LogDebug("Kafka configuration of CAP :\r\n {0}", JsonConvert.SerializeObject(options.AsKafkaConfig(), Formatting.Indented));
} }
public string ServersAddress { get; } public string ServersAddress { get; }
Producer IConnectionPool.Rent() public IProducer<Null, string> RentProducer()
{ {
return Rent(); if (_producerPool.TryDequeue(out var producer))
}
bool IConnectionPool.Return(Producer connection)
{
return Return(connection);
}
public void Dispose()
{ {
_maxSize = 0; Interlocked.Decrement(ref _pCount);
while (_pool.TryDequeue(out var context)) return producer;
{
context.Dispose();
}
} }
private static Func<Producer> CreateActivator(KafkaOptions options) producer = new ProducerBuilder<Null, string>(_options.AsKafkaConfig()).Build();
{
return () => new Producer(options.AsKafkaConfig()); return producer;
} }
public virtual Producer Rent() public bool Return(IProducer<Null, string> producer)
{ {
if (_pool.TryDequeue(out var connection)) if (Interlocked.Increment(ref _pCount) <= _maxSize)
{ {
Interlocked.Decrement(ref _count); _producerPool.Enqueue(producer);
Debug.Assert(_count >= 0);
return connection; return true;
} }
connection = _activator(); Interlocked.Decrement(ref _pCount);
return connection; return false;
} }
public virtual bool Return(Producer connection) public void Dispose()
{ {
if (Interlocked.Increment(ref _count) <= _maxSize) _maxSize = 0;
while (_producerPool.TryDequeue(out var context))
{ {
_pool.Enqueue(connection); context.Dispose();
return true;
} }
Interlocked.Decrement(ref _count);
Debug.Assert(_maxSize == 0 || _pool.Count <= _maxSize);
return false;
} }
} }
} }
\ No newline at end of file
...@@ -9,8 +9,8 @@ namespace DotNetCore.CAP.Kafka ...@@ -9,8 +9,8 @@ namespace DotNetCore.CAP.Kafka
{ {
string ServersAddress { get; } string ServersAddress { get; }
Producer Rent(); IProducer<Null,string> RentProducer();
bool Return(Producer context); bool Return(IProducer<Null, string> producer);
} }
} }
\ No newline at end of file
...@@ -2,8 +2,8 @@ ...@@ -2,8 +2,8 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.
using System; using System;
using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Confluent.Kafka;
using DotNetCore.CAP.Internal; using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States; using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
...@@ -27,22 +27,24 @@ namespace DotNetCore.CAP.Kafka ...@@ -27,22 +27,24 @@ namespace DotNetCore.CAP.Kafka
public override async Task<OperateResult> PublishAsync(string keyName, string content) public override async Task<OperateResult> PublishAsync(string keyName, string content)
{ {
var producer = _connectionPool.Rent(); var producer = _connectionPool.RentProducer();
try try
{ {
var contentBytes = Encoding.UTF8.GetBytes(content); var result = await producer.ProduceAsync(keyName, new Message<Null, string>()
var message = await producer.ProduceAsync(keyName, null, contentBytes);
if (message.Error.HasError)
{ {
throw new PublisherSentFailedException(message.Error.ToString()); Value = content
} });
if (result.Status == PersistenceStatus.Persisted || result.Status == PersistenceStatus.PossiblyPersisted)
{
_logger.LogDebug($"kafka topic message [{keyName}] has been published."); _logger.LogDebug($"kafka topic message [{keyName}] has been published.");
return OperateResult.Success; return OperateResult.Success;
} }
throw new PublisherSentFailedException("kafka message persisted failed!");
}
catch (Exception ex) catch (Exception ex)
{ {
var wapperEx = new PublisherSentFailedException(ex.Message, ex); var wapperEx = new PublisherSentFailedException(ex.Message, ex);
......
...@@ -3,10 +3,8 @@ ...@@ -3,10 +3,8 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text;
using System.Threading; using System.Threading;
using Confluent.Kafka; using Confluent.Kafka;
using Confluent.Kafka.Serialization;
namespace DotNetCore.CAP.Kafka namespace DotNetCore.CAP.Kafka
{ {
...@@ -14,13 +12,12 @@ namespace DotNetCore.CAP.Kafka ...@@ -14,13 +12,12 @@ namespace DotNetCore.CAP.Kafka
{ {
private readonly string _groupId; private readonly string _groupId;
private readonly KafkaOptions _kafkaOptions; private readonly KafkaOptions _kafkaOptions;
private Consumer<Null, string> _consumerClient; private IConsumer<Null, string> _consumerClient;
public KafkaConsumerClient(string groupId, KafkaOptions options) public KafkaConsumerClient(string groupId, KafkaOptions options)
{ {
_groupId = groupId; _groupId = groupId;
_kafkaOptions = options ?? throw new ArgumentNullException(nameof(options)); _kafkaOptions = options ?? throw new ArgumentNullException(nameof(options));
StringDeserializer = new StringDeserializer(Encoding.UTF8);
InitKafkaClient(); InitKafkaClient();
} }
...@@ -47,16 +44,25 @@ namespace DotNetCore.CAP.Kafka ...@@ -47,16 +44,25 @@ namespace DotNetCore.CAP.Kafka
{ {
while (true) while (true)
{ {
cancellationToken.ThrowIfCancellationRequested(); var consumerResult = _consumerClient.Consume(cancellationToken);
_consumerClient.Poll(timeout);
} if (consumerResult.IsPartitionEOF || consumerResult.Value == null) continue;
var message = new MessageContext
{
Group = _groupId,
Name = consumerResult.Topic,
Content = consumerResult.Value
};
OnMessageReceived?.Invoke(consumerResult, message);
}
// ReSharper disable once FunctionNeverReturns // ReSharper disable once FunctionNeverReturns
} }
public void Commit() public void Commit()
{ {
_consumerClient.CommitAsync(); _consumerClient.Commit();
} }
public void Reject() public void Reject()
...@@ -76,47 +82,22 @@ namespace DotNetCore.CAP.Kafka ...@@ -76,47 +82,22 @@ namespace DotNetCore.CAP.Kafka
lock (_kafkaOptions) lock (_kafkaOptions)
{ {
_kafkaOptions.MainConfig["group.id"] = _groupId; _kafkaOptions.MainConfig["group.id"] = _groupId;
_kafkaOptions.MainConfig["auto.offset.reset"] = "earliest";
var config = _kafkaOptions.AsKafkaConfig(); var config = _kafkaOptions.AsKafkaConfig();
_consumerClient = new Consumer<Null, string>(config, null, StringDeserializer); _consumerClient = new ConsumerBuilder<Null, string>(config)
_consumerClient.OnConsumeError += ConsumerClient_OnConsumeError; .SetErrorHandler(ConsumerClient_OnConsumeError)
_consumerClient.OnMessage += ConsumerClient_OnMessage; .Build();
_consumerClient.OnError += ConsumerClient_OnError;
} }
} }
private void ConsumerClient_OnConsumeError(object sender, Message e) private void ConsumerClient_OnConsumeError(IConsumer<Null, string> consumer, Error e)
{
var message = e.Deserialize<Null, string>(null, StringDeserializer);
var logArgs = new LogMessageEventArgs
{
LogType = MqLogType.ConsumeError,
Reason = $"An error occurred during consume the message; Topic:'{e.Topic}'," +
$"Message:'{message.Value}', Reason:'{e.Error}'."
};
OnLog?.Invoke(sender, logArgs);
}
private void ConsumerClient_OnMessage(object sender, Message<Null, string> e)
{
var message = new MessageContext
{
Group = _groupId,
Name = e.Topic,
Content = e.Value
};
OnMessageReceived?.Invoke(sender, message);
}
private void ConsumerClient_OnError(object sender, Error e)
{ {
var logArgs = new LogMessageEventArgs var logArgs = new LogMessageEventArgs
{ {
LogType = MqLogType.ServerConnError, LogType = MqLogType.ServerConnError,
Reason = e.ToString() Reason = $"An error occurred during connect kafka --> {e.Reason}"
}; };
OnLog?.Invoke(sender, logArgs); OnLog?.Invoke(null, logArgs);
} }
#endregion private methods #endregion private methods
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment