Commit 71e3e1d4 authored by Savorboard's avatar Savorboard

Fix message consumer bugs

parent aba3ba9e
...@@ -71,7 +71,7 @@ namespace Sample.RabbitMQ.MySql.Controllers ...@@ -71,7 +71,7 @@ namespace Sample.RabbitMQ.MySql.Controllers
[CapSubscribe("sample.rabbitmq.mysql")] [CapSubscribe("sample.rabbitmq.mysql")]
public void Subscriber(DateTime time) public void Subscriber(DateTime time)
{ {
//Console.WriteLine($@"{DateTime.Now}, Subscriber invoked, Sent time:{time}"); Console.WriteLine($@"{DateTime.Now}, Subscriber invoked, Sent time:{time}");
} }
} }
} }
...@@ -103,7 +103,7 @@ namespace DotNetCore.CAP.MySql ...@@ -103,7 +103,7 @@ namespace DotNetCore.CAP.MySql
return message; return message;
} }
public async Task<MediumMessage> StoreMessageAsync(string name, string group, Message content, CancellationToken cancellationToken = default) public Task<MediumMessage> StoreMessageAsync(string name, string group, Message content, CancellationToken cancellationToken = default)
{ {
var sql = $@"INSERT INTO `{_options.Value.TableNamePrefix}.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; var sql = $@"INSERT INTO `{_options.Value.TableNamePrefix}.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
...@@ -125,14 +125,15 @@ namespace DotNetCore.CAP.MySql ...@@ -125,14 +125,15 @@ namespace DotNetCore.CAP.MySql
Retries = message.Retries, Retries = message.Retries,
Added = message.Added, Added = message.Added,
ExpiresAt = message.ExpiresAt, ExpiresAt = message.ExpiresAt,
StatusName = StatusName.Scheduled StatusName = nameof(StatusName.Scheduled)
}; };
using (var connection = new MySqlConnection(_options.Value.ConnectionString)) using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{ {
await connection.ExecuteAsync(sql, po); connection.Execute(sql, po);
} }
return message;
return Task.FromResult(message);
} }
public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default) public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default)
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text;
using System.Threading; using System.Threading;
using DotNetCore.CAP.Messages; using DotNetCore.CAP.Messages;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
...@@ -163,11 +164,14 @@ namespace DotNetCore.CAP.RabbitMQ ...@@ -163,11 +164,14 @@ namespace DotNetCore.CAP.RabbitMQ
{ {
_deliveryTag = e.DeliveryTag; _deliveryTag = e.DeliveryTag;
var header = e.BasicProperties.Headers var headers = new Dictionary<string, string>();
.ToDictionary(x => x.Key, x => x.Value.ToString()); foreach (var header in e.BasicProperties.Headers)
header.Add(Headers.Group, _queueName); {
headers.Add(header.Key, header.Value == null ? null : Encoding.UTF8.GetString((byte[])header.Value));
}
headers.Add(Headers.Group, _queueName);
var message = new TransportMessage(header, e.Body); var message = new TransportMessage(headers, e.Body);
OnMessageReceived?.Invoke(sender, message); OnMessageReceived?.Invoke(sender, message);
} }
......
...@@ -36,6 +36,8 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -36,6 +36,8 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton<CapMarkerService>(); services.TryAddSingleton<CapMarkerService>();
services.TryAddSingleton<ICapPublisher, CapPublisher>();
//Serializer and model binder //Serializer and model binder
services.TryAddSingleton<IContentSerializer, JsonContentSerializer>(); services.TryAddSingleton<IContentSerializer, JsonContentSerializer>();
services.TryAddSingleton<IMessagePacker, DefaultMessagePacker>(); services.TryAddSingleton<IMessagePacker, DefaultMessagePacker>();
...@@ -55,6 +57,7 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -55,6 +57,7 @@ namespace Microsoft.Extensions.DependencyInjection
//Queue's message processor //Queue's message processor
services.TryAddSingleton<MessageNeedToRetryProcessor>(); services.TryAddSingleton<MessageNeedToRetryProcessor>();
services.TryAddSingleton<TransportCheckProcessor>(); services.TryAddSingleton<TransportCheckProcessor>();
services.TryAddSingleton<CollectorProcessor>();
//Sender and Executors //Sender and Executors
services.TryAddSingleton<IMessageSender, MessageSender>(); services.TryAddSingleton<IMessageSender, MessageSender>();
......
...@@ -160,6 +160,7 @@ namespace DotNetCore.CAP ...@@ -160,6 +160,7 @@ namespace DotNetCore.CAP
var stopwatch = Stopwatch.StartNew(); var stopwatch = Stopwatch.StartNew();
var message = await _serializer.DeserializeAsync(messageContext); var message = await _serializer.DeserializeAsync(messageContext);
var mediumMessage = await _storage.StoreMessageAsync(message.GetName(), message.GetGroup(), message); var mediumMessage = await _storage.StoreMessageAsync(message.GetName(), message.GetGroup(), message);
client.Commit(); client.Commit();
......
...@@ -14,14 +14,15 @@ using DotNetCore.CAP.Persistence; ...@@ -14,14 +14,15 @@ using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Processor; using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
internal class DefaultSubscriberExecutor : ISubscriberExecutor internal class DefaultSubscriberExecutor : ISubscriberExecutor
{ {
private readonly ICapPublisher _sender;
private readonly IDataStorage _dataStorage; private readonly IDataStorage _dataStorage;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly IServiceProvider _provider;
private readonly CapOptions _options; private readonly CapOptions _options;
private readonly MethodMatcherCache _selector; private readonly MethodMatcherCache _selector;
...@@ -33,18 +34,17 @@ namespace DotNetCore.CAP ...@@ -33,18 +34,17 @@ namespace DotNetCore.CAP
public DefaultSubscriberExecutor( public DefaultSubscriberExecutor(
ILogger<DefaultSubscriberExecutor> logger, ILogger<DefaultSubscriberExecutor> logger,
IOptions<CapOptions> options, IOptions<CapOptions> options,
IConsumerInvokerFactory consumerInvokerFactory, IServiceProvider provider,
ICapPublisher sender,
IDataStorage dataStorage,
MethodMatcherCache selector) MethodMatcherCache selector)
{ {
_selector = selector; _selector = selector;
_sender = sender;
_options = options.Value;
_dataStorage = dataStorage;
_logger = logger;
Invoker = consumerInvokerFactory.CreateInvoker(); _provider = provider;
_logger = logger;
_options = options.Value;
_dataStorage = _provider.GetService<IDataStorage>();
Invoker = _provider.GetService<IConsumerInvokerFactory>().CreateInvoker();
} }
private IConsumerInvoker Invoker { get; } private IConsumerInvoker Invoker { get; }
...@@ -190,7 +190,7 @@ namespace DotNetCore.CAP ...@@ -190,7 +190,7 @@ namespace DotNetCore.CAP
[Headers.CorrelationSequence] = (message.Origin.GetCorrelationSequence() + 1).ToString() [Headers.CorrelationSequence] = (message.Origin.GetCorrelationSequence() + 1).ToString()
}; };
await _sender.PublishAsync(ret.CallbackName, ret.Result, header, cancellationToken); await _provider.GetService<ICapPublisher>().PublishAsync(ret.CallbackName, ret.Result, header, cancellationToken);
} }
} }
catch (OperationCanceledException) catch (OperationCanceledException)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment