Unverified Commit 0838a23d authored by Andrew Jaffie's avatar Andrew Jaffie Committed by GitHub

Add capability to optionally set kafka message key via new header (#498)

parent 4991c388
......@@ -43,7 +43,7 @@ namespace DotNetCore.CAP.Kafka
var result = await producer.ProduceAsync(message.GetName(), new Message<string, byte[]>
{
Headers = headers,
Key = message.GetId(),
Key = message.Headers.TryGetValue(KafkaHeaders.KafkaKey, out string kafkaMessageKey) && !string.IsNullOrEmpty(kafkaMessageKey) ? kafkaMessageKey : message.GetId(),
Value = message.Body
});
......
......@@ -62,6 +62,8 @@ namespace DotNetCore.CAP.Kafka
}
headers.Add(Messages.Headers.Group, _groupId);
headers.Add(KafkaHeaders.KafkaKey, consumerResult.Key);
if (_kafkaOptions.CustomHeaders != null)
{
var customHeaders = _kafkaOptions.CustomHeaders(consumerResult);
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP.Kafka
{
public static class KafkaHeaders
{
public const string KafkaKey = "cap-kafka-key";
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment