Commit eb90cac6 authored by Savorboard's avatar Savorboard

Code refactoring

parent 02c0eefd
...@@ -51,14 +51,14 @@ namespace DotNetCore.CAP.Internal ...@@ -51,14 +51,14 @@ namespace DotNetCore.CAP.Internal
var messageId = SnowflakeId.Default().NextId().ToString(); var messageId = SnowflakeId.Default().NextId().ToString();
optionHeaders.Add(Headers.MessageId, messageId); optionHeaders.Add(Headers.MessageId, messageId);
optionHeaders.Add(Headers.MessageName, name);
optionHeaders.Add(Headers.Type, typeof(T).FullName);
optionHeaders.Add(Headers.SentTime, DateTimeOffset.Now.ToString());
if (!optionHeaders.ContainsKey(Headers.CorrelationId)) if (!optionHeaders.ContainsKey(Headers.CorrelationId))
{ {
optionHeaders.Add(Headers.CorrelationId, messageId); optionHeaders.Add(Headers.CorrelationId, messageId);
optionHeaders.Add(Headers.CorrelationSequence, 0.ToString()); optionHeaders.Add(Headers.CorrelationSequence, 0.ToString());
} }
optionHeaders.Add(Headers.MessageName, name);
optionHeaders.Add(Headers.Type, typeof(T).FullName);
optionHeaders.Add(Headers.SentTime, DateTimeOffset.Now.ToString());
var message = new Message(optionHeaders, value); var message = new Message(optionHeaders, value);
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
using System; using System;
using System.Diagnostics; using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Runtime.Serialization;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics; using DotNetCore.CAP.Diagnostics;
...@@ -162,22 +163,23 @@ namespace DotNetCore.CAP.Internal ...@@ -162,22 +163,23 @@ namespace DotNetCore.CAP.Internal
var name = transportMessage.GetName(); var name = transportMessage.GetName();
var group = transportMessage.GetGroup(); var group = transportMessage.GetGroup();
if (!_selector.TryGetTopicExecutor(name, group, out var executor))
{
var error = $"Message can not be found subscriber. Name:{name}, Group:{group}. {Environment.NewLine} see: https://github.com/dotnetcore/CAP/issues/63";
throw new SubscriberNotFoundException(error);
}
var type = executor.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType;
Message message; Message message;
var canFindSubscriber = _selector.TryGetTopicExecutor(name, group, out var executor);
try try
{ {
if (!canFindSubscriber)
{
var error = $"Message can not be found subscriber. Name:{name}, Group:{group}. {Environment.NewLine} see: https://github.com/dotnetcore/CAP/issues/63";
throw new SubscriberNotFoundException(error);
}
var type = executor.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType;
message = await _serializer.DeserializeAsync(transportMessage, type); message = await _serializer.DeserializeAsync(transportMessage, type);
} }
catch (Exception e) catch (Exception e)
{ {
transportMessage.Headers.Add(Headers.Exception, e.Message); transportMessage.Headers.Add(Headers.Exception, nameof(SerializationException) + "-->" + e.Message);
var dataUri = $"data:{transportMessage.Headers[Headers.Type]};base64," + Convert.ToBase64String(transportMessage.Body); var dataUri = $"data:{transportMessage.Headers[Headers.Type]};base64," + Convert.ToBase64String(transportMessage.Body);
message = new Message(transportMessage.Headers, dataUri); message = new Message(transportMessage.Headers, dataUri);
} }
...@@ -211,7 +213,7 @@ namespace DotNetCore.CAP.Internal ...@@ -211,7 +213,7 @@ namespace DotNetCore.CAP.Internal
} }
catch (Exception e) catch (Exception e)
{ {
_logger.LogError(e, "An exception occurred when store received message. Message:'{0}'.", transportMessage); _logger.LogError(e, "An exception occurred when process received message. Message:'{0}'.", transportMessage);
client.Reject(); client.Reject();
......
...@@ -102,10 +102,12 @@ namespace DotNetCore.CAP.Internal ...@@ -102,10 +102,12 @@ namespace DotNetCore.CAP.Internal
private async Task<bool> SetFailedState(MediumMessage message, Exception ex) private async Task<bool> SetFailedState(MediumMessage message, Exception ex)
{ {
//TODO: Add exception to content
var needRetry = UpdateMessageForRetry(message); var needRetry = UpdateMessageForRetry(message);
if (message.ExpiresAt != null)
{
message.ExpiresAt = DateTime.Now.AddDays(15);
}
await _dataStorage.ChangePublishStateAsync(message, StatusName.Failed); await _dataStorage.ChangePublishStateAsync(message, StatusName.Failed);
return needRetry; return needRetry;
......
...@@ -9,18 +9,18 @@ ...@@ -9,18 +9,18 @@
public const string MessageName = "cap-msg-name"; public const string MessageName = "cap-msg-name";
public const string CorrelationId = "cap-corr-id"; public const string Group = "cap-msg-group";
public const string CorrelationSequence = "cap-corr-seq";
/// <summary> /// <summary>
/// Message value .NET type /// Message value .NET type
/// </summary> /// </summary>
public const string Type = "cap-msg-type"; public const string Type = "cap-msg-type";
public const string CallbackName = "cap-callback-name"; public const string CorrelationId = "cap-corr-id";
public const string Group = "cap-msg-group"; public const string CorrelationSequence = "cap-corr-seq";
public const string CallbackName = "cap-callback-name";
public const string SentTime = "cap-senttime"; public const string SentTime = "cap-senttime";
......
using System.Threading.Tasks; // Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Threading.Tasks;
using DotNetCore.CAP.Messages; using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.Transport namespace DotNetCore.CAP.Transport
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment