Commit 9bac150d authored by Savorboard's avatar Savorboard

Merge branch 'master' of https://github.com/dotnetcore/CAP

# Conflicts:
#	src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs
parents 7fabeae2 0838a23d
...@@ -159,7 +159,7 @@ namespace DotNetCore.CAP.AzureServiceBus ...@@ -159,7 +159,7 @@ namespace DotNetCore.CAP.AzureServiceBus
private Task OnConsumerReceived(Message message, CancellationToken token) private Task OnConsumerReceived(Message message, CancellationToken token)
{ {
var header = message.UserProperties.ToDictionary(x => x.Key, y => y.Value.ToString()); var header = message.UserProperties.ToDictionary(x => x.Key, y => y.Value?.ToString());
header.Add(Headers.Group, _subscriptionName); header.Add(Headers.Group, _subscriptionName);
var context = new TransportMessage(header, message.Body); var context = new TransportMessage(header, message.Body);
......
...@@ -43,7 +43,7 @@ namespace DotNetCore.CAP.Kafka ...@@ -43,7 +43,7 @@ namespace DotNetCore.CAP.Kafka
var result = await producer.ProduceAsync(message.GetName(), new Message<string, byte[]> var result = await producer.ProduceAsync(message.GetName(), new Message<string, byte[]>
{ {
Headers = headers, Headers = headers,
Key = message.GetId(), Key = message.Headers.TryGetValue(KafkaHeaders.KafkaKey, out string kafkaMessageKey) && !string.IsNullOrEmpty(kafkaMessageKey) ? kafkaMessageKey : message.GetId(),
Value = message.Body Value = message.Body
}); });
......
...@@ -62,6 +62,8 @@ namespace DotNetCore.CAP.Kafka ...@@ -62,6 +62,8 @@ namespace DotNetCore.CAP.Kafka
} }
headers.Add(Messages.Headers.Group, _groupId); headers.Add(Messages.Headers.Group, _groupId);
headers.Add(KafkaHeaders.KafkaKey, consumerResult.Key);
if (_kafkaOptions.CustomHeaders != null) if (_kafkaOptions.CustomHeaders != null)
{ {
var customHeaders = _kafkaOptions.CustomHeaders(consumerResult); var customHeaders = _kafkaOptions.CustomHeaders(consumerResult);
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP.Kafka
{
public static class KafkaHeaders
{
public const string KafkaKey = "cap-kafka-key";
}
}
...@@ -175,7 +175,7 @@ select count(Id) from {_recName} with (nolock) where StatusName = N'Failed';"; ...@@ -175,7 +175,7 @@ select count(Id) from {_recName} with (nolock) where StatusName = N'Failed';";
var sqlQuery2008 = $@" var sqlQuery2008 = $@"
with aggr as ( with aggr as (
select replace(convert(varchar, Added, 111), '/','-') + '-' + CONVERT(varchar, DATEPART(hh, Added)) as [Key], select replace(convert(varchar, Added, 111), '/','-') + '-' + CONVERT(varchar, DATEPART(hh, Added)) as [Key],
count(id) [Count] count(Id) [Count]
from {tableName} from {tableName}
where StatusName = @statusName where StatusName = @statusName
group by replace(convert(varchar, Added, 111), '/','-') + '-' + CONVERT(varchar, DATEPART(hh, Added)) group by replace(convert(varchar, Added, 111), '/','-') + '-' + CONVERT(varchar, DATEPART(hh, Added))
...@@ -186,7 +186,7 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;"; ...@@ -186,7 +186,7 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;";
var sqlQuery = $@" var sqlQuery = $@"
with aggr as ( with aggr as (
select FORMAT(Added,'yyyy-MM-dd-HH') as [Key], select FORMAT(Added,'yyyy-MM-dd-HH') as [Key],
count(id) [Count] count(Id) [Count]
from {tableName} from {tableName}
where StatusName = @statusName where StatusName = @statusName
group by FORMAT(Added,'yyyy-MM-dd-HH') group by FORMAT(Added,'yyyy-MM-dd-HH')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment