Commit 9b67cc4d authored by yangxiaodong's avatar yangxiaodong

Refactor.

parent b478b77d
......@@ -24,7 +24,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore
/// <summary>
/// Gets or sets the <see cref="DbSet{ConsistencyMessage}"/> of Messages.
/// </summary>
public DbSet<ConsistencyMessage> Messages { get; set; }
public DbSet<CapMessage> Messages { get; set; }
/// <summary>
/// Configures the schema for the identity framework.
......@@ -34,7 +34,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore
/// </param>
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<ConsistencyMessage>(b =>
modelBuilder.Entity<CapMessage>(b =>
{
b.HasKey(m => m.Id);
b.ToTable("ConsistencyMessages");
......
......@@ -18,7 +18,7 @@ namespace Microsoft.Extensions.DependencyInjection
public static CapBuilder AddEntityFrameworkStores<TContext>(this CapBuilder builder)
where TContext : DbContext
{
builder.Services.AddScoped<ICapMessageStore, ConsistencyMessageStore<TContext>>();
builder.Services.AddScoped<ICapMessageStore, CapMessageStore<TContext>>();
return builder;
}
......
......@@ -83,14 +83,14 @@ namespace DotNetCore.CAP.Kafka
var messageStore = provider.GetRequiredService<ICapMessageStore>();
try
{
var message = await messageStore.GetFirstEnqueuedMessageAsync(_cts.Token);
var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync();
if (message != null)
{
var sp = Stopwatch.StartNew();
message.Status = MessageStatus.Processing;
await messageStore.UpdateAsync(message, _cts.Token);
message.StateName = StateName.Processing;
await messageStore.UpdateSentMessageAsync(message);
var jobResult = ExecuteJob(message.Topic, message.Payload);
var jobResult = ExecuteJob(message.KeyName, message.Content);
sp.Stop();
......@@ -100,14 +100,15 @@ namespace DotNetCore.CAP.Kafka
}
else
{
message.Status = MessageStatus.Successed;
await messageStore.UpdateAsync(message, _cts.Token);
//await messageStore.DeleteAsync(message, _cts.Token);
//TODO : the state will be deleted when release.
message.StateName = StateName.Succeeded;
await messageStore.UpdateSentMessageAsync(message);
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
}
}
}
catch (Exception )
catch (Exception)
{
return false;
}
......
......@@ -14,7 +14,7 @@ namespace DotNetCore.CAP.Kafka
private Consumer<Null, string> _consumerClient;
public event EventHandler<DeliverMessage> MessageReceieved;
public event EventHandler<MessageBase> MessageReceieved;
public IDeserializer<string> StringDeserializer { get; set; }
......@@ -69,11 +69,10 @@ namespace DotNetCore.CAP.Kafka
private void ConsumerClient_OnMessage(object sender, Message<Null, string> e)
{
var message = new DeliverMessage
var message = new MessageBase
{
MessageKey = e.Topic,
Value = e.Value,
Body = Encoding.UTF8.GetBytes(e.Value)
KeyName = e.Topic,
Content = e.Value
};
MessageReceieved?.Invoke(sender, message);
}
......
......@@ -11,7 +11,8 @@ namespace DotNetCore.CAP.Kafka
: this(topicName, partition, 0) { }
public KafkaTopicAttribute(string topicName, int partition, long offset)
: base(topicName) {
: base(topicName)
{
Offset = offset;
Partition = partition;
}
......@@ -24,7 +25,8 @@ namespace DotNetCore.CAP.Kafka
public bool IsOffset { get { return Offset == 0; } }
public override string ToString() {
public override string ToString()
{
return Name;
}
}
......
......@@ -19,7 +19,7 @@ namespace DotNetCore.CAP.RabbitMQ
private string _queueName;
public event EventHandler<DeliverMessage> MessageReceieved;
public event EventHandler<MessageBase> MessageReceieved;
public RabbitMQConsumerClient(string exchange, string hostName)
{
......@@ -65,11 +65,10 @@ namespace DotNetCore.CAP.RabbitMQ
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e)
{
var message = new DeliverMessage
var message = new MessageBase
{
MessageKey = e.RoutingKey,
Body = e.Body,
Value = Encoding.UTF8.GetString(e.Body)
KeyName = e.RoutingKey,
Content = Encoding.UTF8.GetString(e.Body)
};
MessageReceieved?.Invoke(sender, message);
}
......
......@@ -38,7 +38,8 @@ namespace DotNetCore.CAP.RabbitMQ
}
}
public Task SendAsync<T>(string topic, T contentObj) {
public Task SendAsync<T>(string topic, T contentObj)
{
throw new NotImplementedException();
}
}
......
......@@ -13,7 +13,7 @@ namespace DotNetCore.CAP.Abstractions
/// </summary>
/// <param name="descriptor">consumer method descriptor. </param>
/// <param name="message"> reveied message.</param>
public ConsumerContext(ConsumerExecutorDescriptor descriptor, DeliverMessage message)
public ConsumerContext(ConsumerExecutorDescriptor descriptor, MessageBase message)
{
ConsumerDescriptor = descriptor ?? throw new ArgumentNullException(nameof(descriptor));
DeliverMessage = message ?? throw new ArgumentNullException(nameof(message));
......@@ -27,6 +27,6 @@ namespace DotNetCore.CAP.Abstractions
/// <summary>
/// consumer reveived message.
/// </summary>
public DeliverMessage DeliverMessage { get; set; }
public MessageBase DeliverMessage { get; set; }
}
}
\ No newline at end of file
using System.Collections.Generic;
using DotNetCore.CAP.Abstractions;
namespace DotNetCore.CAP.Abstractions
{
/// <summary>
/// Defines an interface for selecting an cosumer service method to invoke for the current message.
/// Defines an interface for selecting an cosumer service method to invoke for the current message.
/// </summary>
public interface IConsumerServiceSelector
{
......@@ -17,8 +16,8 @@ namespace DotNetCore.CAP.Abstractions
IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(CapStartContext context);
/// <summary>
/// Selects the best <see cref="ConsumerExecutorDescriptor"/> candidate from <paramref name="candidates"/> for the
/// current message associated with <paramref name="context"/>.
/// Selects the best <see cref="ConsumerExecutorDescriptor"/> candidate from <paramref name="candidates"/> for the
/// current message associated.
/// </summary>
/// <param name="key">topic or exchange router key.</param>
/// <param name="candidates">the set of <see cref="ConsumerExecutorDescriptor"/> candidates.</param>
......
......@@ -21,7 +21,7 @@ namespace DotNetCore.CAP.Abstractions.ModelBinding
/// </para>
/// <para>
/// A model binder that completes successfully should set <see cref="ModelBindingContext.Result"/> to
/// a value returned from <see cref="ModelBindingResult.Success"/>.
/// a value returned from <see cref="ModelBindingResult.Success"/>.
/// </para>
/// </returns>
Task BindModelAsync(ModelBindingContext bindingContext);
......
......@@ -18,7 +18,7 @@ namespace DotNetCore.CAP.Abstractions.ModelBinding
public object Model { get; set; }
/// <summary>
/// Gets or sets the name of the model.
/// Gets or sets the name of the model.
/// </summary>
public string ModelName { get; set; }
......
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP
......@@ -11,49 +10,40 @@ namespace DotNetCore.CAP
public interface ICapMessageStore
{
/// <summary>
/// Finds and returns a message, if any, who has the specified <paramref name="messageId"/>.
/// Creates a new message in a store as an asynchronous operation.
/// </summary>
/// <param name="messageId">The message ID to search for.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the message matching the specified <paramref name="messageId"/> if it exists.
/// </returns>
Task<ConsistencyMessage> FindByIdAsync(string messageId, CancellationToken cancellationToken);
/// <param name="message">The message to create in the store.</param>
Task<OperateResult> StoreSentMessageAsync(CapSentMessage message);
/// <summary>
/// Creates a new message in a store as an asynchronous operation.
/// Fetches the next message to be executed.
/// </summary>
/// <param name="message">The message to create in the store.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that represents the <see cref="OperateResult"/> of the asynchronous query.</returns>
Task<OperateResult> CreateAsync(ConsistencyMessage message, CancellationToken cancellationToken);
/// <returns></returns>
Task<CapSentMessage> GetNextSentMessageToBeEnqueuedAsync();
/// <summary>
/// Updates a message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to update in the store.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that represents the <see cref="OperateResult"/> of the asynchronous query.</returns>
Task<OperateResult> UpdateAsync(ConsistencyMessage message, CancellationToken cancellationToken);
Task<OperateResult> UpdateSentMessageAsync(CapSentMessage message);
/// <summary>
/// Deletes a message from the store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to delete in the store.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that represents the <see cref="OperateResult"/> of the asynchronous query.</returns>
Task<OperateResult> DeleteAsync(ConsistencyMessage message, CancellationToken cancellationToken);
Task<OperateResult> RemoveSentMessageAsync(CapSentMessage message);
/// <summary>
/// Gets the ID for a message from the store as an asynchronous operation.
/// Creates a new message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message whose ID should be returned.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that contains the ID of the message.</returns>
Task<string> GeConsistencyMessageIdAsync(ConsistencyMessage message, CancellationToken cancellationToken);
Task<ConsistencyMessage> GetFirstEnqueuedMessageAsync(CancellationToken cancellationToken);
/// <param name="message"></param>
/// <returns></returns>
Task<OperateResult> StoreReceivedMessageAsync(CapReceivedMessage message);
// void ChangeState(ConsistencyMessage message, MessageStatus status);
/// <summary>
/// Updates a message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to update in the store.</param>
Task<OperateResult> UpdateReceivedMessageAsync(CapReceivedMessage message);
}
}
\ No newline at end of file
using System;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.Logging;
......@@ -13,7 +12,6 @@ namespace DotNetCore.CAP
{
private readonly ICapMessageStore _store;
private readonly ILogger _logger;
private readonly CancellationTokenSource _cts;
public DefaultProducerService(
ICapMessageStore store,
......@@ -21,7 +19,6 @@ namespace DotNetCore.CAP
{
_store = store;
_logger = logger;
_cts = new CancellationTokenSource();
}
public Task SendAsync(string topic, string content)
......@@ -45,20 +42,17 @@ namespace DotNetCore.CAP
private async Task StoreMessage(string topic, string content)
{
var message = new ConsistencyMessage
var message = new CapSentMessage
{
Topic = topic,
Payload = content
KeyName = topic,
Content = content
};
await _store.CreateAsync(message, _cts.Token);
await _store.StoreSentMessageAsync(message);
WaitHandleEx.PulseEvent.Set();
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Enqueuing a topic to be store. topic:{topic}, content:{content}", topic, content);
}
_logger.EnqueuingMessage(topic, content);
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Text;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP
{
/// <summary>
/// consumer client
/// consumer client
/// </summary>
public interface IConsumerClient : IDisposable
{
......@@ -16,6 +14,6 @@ namespace DotNetCore.CAP
void Listening(TimeSpan timeout);
event EventHandler<DeliverMessage> MessageReceieved;
event EventHandler<MessageBase> MessageReceieved;
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace DotNetCore.CAP
namespace DotNetCore.CAP
{
/// <summary>
/// Consumer client factory to create consumer client instance.
......@@ -18,4 +13,4 @@ namespace DotNetCore.CAP
/// <returns></returns>
IConsumerClient Create(string groupId, string clientHostAddress);
}
}
}
\ No newline at end of file
using System;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
......@@ -24,7 +23,7 @@ namespace DotNetCore.CAP
private readonly ICapMessageStore _messageStore;
private readonly CancellationTokenSource _cts;
public event EventHandler<ConsistencyMessage> MessageReceieved;
public event EventHandler<CapMessage> MessageReceieved;
private CapStartContext _context;
private Task _compositeTask;
......@@ -37,7 +36,8 @@ namespace DotNetCore.CAP
ILoggerFactory loggerFactory,
ICapMessageStore messageStore,
MethodMatcherCache selector,
IOptions<CapOptions> options) {
IOptions<CapOptions> options)
{
_selector = selector;
_logger = loggerFactory.CreateLogger<ConsumerHandler>();
_loggerFactory = loggerFactory;
......@@ -49,23 +49,29 @@ namespace DotNetCore.CAP
_cts = new CancellationTokenSource();
}
protected virtual void OnMessageReceieved(ConsistencyMessage message) {
protected virtual void OnMessageReceieved(CapMessage message)
{
MessageReceieved?.Invoke(this, message);
}
public void Start() {
public void Start()
{
_context = new CapStartContext(_serviceProvider, _cts.Token);
var matchs = _selector.GetCandidatesMethods(_context);
var groupingMatchs = matchs.GroupBy(x => x.Value.Attribute.GroupOrExchange);
foreach (var matchGroup in groupingMatchs) {
Task.Factory.StartNew(() => {
using (var client = _consumerClientFactory.Create(matchGroup.Key, _options.BrokerUrlList)) {
foreach (var matchGroup in groupingMatchs)
{
Task.Factory.StartNew(() =>
{
using (var client = _consumerClientFactory.Create(matchGroup.Key, _options.BrokerUrlList))
{
client.MessageReceieved += OnMessageReceieved;
foreach (var item in matchGroup) {
foreach (var item in matchGroup)
{
client.Subscribe(item.Key);
}
......@@ -76,19 +82,17 @@ namespace DotNetCore.CAP
_compositeTask = Task.CompletedTask;
}
public virtual void OnMessageReceieved(object sender, DeliverMessage message) {
var consistencyMessage = new ConsistencyMessage() {
Topic = message.MessageKey,
Payload = "Reveived:" + Encoding.UTF8.GetString(message.Body),
Status = MessageStatus.Received
};
public virtual void OnMessageReceieved(object sender, MessageBase message)
{
var capMessage = new CapReceivedMessage(message);
_logger.LogInformation("message receieved message topic name: " + consistencyMessage.Id);
_logger.LogInformation("message receieved message topic name: " + capMessage.Id);
_messageStore.CreateAsync(consistencyMessage, _cts.Token).Wait();
_messageStore.StoreReceivedMessageAsync(capMessage).Wait();
try {
var executeDescriptor = _selector.GetTopicExector(message.MessageKey);
try
{
var executeDescriptor = _selector.GetTopicExector(message.KeyName);
var consumerContext = new ConsumerContext(executeDescriptor, message);
......@@ -96,15 +100,18 @@ namespace DotNetCore.CAP
invoker.InvokeAsync();
_messageStore.UpdateAsync(consistencyMessage, _cts.Token).Wait();
_messageStore.UpdateReceivedMessageAsync(capMessage).Wait();
}
catch (Exception ex) {
catch (Exception ex)
{
_logger.LogError("exception raised when excute method : " + ex.Message);
}
}
public void Dispose() {
if (_disposed) {
public void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
......@@ -112,12 +119,15 @@ namespace DotNetCore.CAP
_logger.ServerShuttingDown();
_cts.Cancel();
try {
try
{
_compositeTask.Wait((int)TimeSpan.FromSeconds(60).TotalMilliseconds);
}
catch (AggregateException ex) {
catch (AggregateException ex)
{
var innerEx = ex.InnerExceptions[0];
if (!(innerEx is OperationCanceledException)) {
if (!(innerEx is OperationCanceledException))
{
_logger.ExpectedOperationCanceledException(innerEx);
}
}
......
using System;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Abstractions.ModelBinding;
......@@ -41,13 +40,13 @@ namespace DotNetCore.CAP.Internal
{
var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType());
var bodyString = Encoding.UTF8.GetString(_consumerContext.DeliverMessage.Body);
var value = _consumerContext.DeliverMessage.Content;
if (_executor.MethodParameters.Length > 0)
{
var firstParameter = _executor.MethodParameters[0];
var bindingContext = ModelBindingContext.CreateBindingContext(bodyString,
var bindingContext = ModelBindingContext.CreateBindingContext(value,
firstParameter.Name, firstParameter.ParameterType);
_modelBinder.BindModelAsync(bindingContext);
......
......@@ -24,7 +24,8 @@ namespace DotNetCore.CAP.Internal
}
/// <summary>
///
/// Selects the best <see cref="ConsumerExecutorDescriptor"/> candidate from <paramref name="candidates"/> for the
/// current message associated.
/// </summary>
/// <param name="key"></param>
/// <param name="executeDescriptor"></param>
......
using System;
using System.Collections.Concurrent;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP.Internal
{
......
......@@ -17,6 +17,8 @@ namespace DotNetCore.CAP
private static Action<ILogger, string, double, Exception> _cronJobExecuted;
private static Action<ILogger, string, Exception> _cronJobFailed;
private static Action<ILogger, string, string, Exception> _enqueuingMessage;
static LoggerExtensions()
{
_serverStarting = LoggerMessage.Define<int, int>(
......@@ -53,6 +55,16 @@ namespace DotNetCore.CAP
LogLevel.Warning,
4,
"Cron job '{jobName}' failed to execute.");
_enqueuingMessage = LoggerMessage.Define<string, string>(
LogLevel.Debug,
2,
"Enqueuing a topic to the store. NameKey: {NameKey}. Content: {Content}");
}
public static void EnqueuingMessage(this ILogger logger, string nameKey, string content)
{
_enqueuingMessage(logger, nameKey, content, null);
}
public static void ServerStarting(this ILogger logger, int machineProcessorCount, int processorCount)
......
......@@ -54,6 +54,10 @@ namespace DotNetCore.CAP.Test
{
throw new NotImplementedException();
}
public Task SendAsync<T>(string topic, T contentObj) {
throw new NotImplementedException();
}
}
......@@ -67,32 +71,32 @@ namespace DotNetCore.CAP.Test
private class MyMessageStore : ICapMessageStore
{
public Task<OperateResult> CreateAsync(ConsistencyMessage message, CancellationToken cancellationToken)
public Task<CapSentMessage> GetNextSentMessageToBeEnqueuedAsync()
{
throw new NotImplementedException();
}
public Task<OperateResult> DeleteAsync(ConsistencyMessage message, CancellationToken cancellationToken)
public Task<OperateResult> RemoveSentMessageAsync(CapSentMessage message)
{
throw new NotImplementedException();
}
public Task<ConsistencyMessage> FindByIdAsync(string messageId, CancellationToken cancellationToken)
public Task<OperateResult> StoreReceivedMessageAsync(CapReceivedMessage message)
{
throw new NotImplementedException();
}
public Task<string> GeConsistencyMessageIdAsync(ConsistencyMessage message, CancellationToken cancellationToken)
public Task<OperateResult> StoreSentMessageAsync(CapSentMessage message)
{
throw new NotImplementedException();
}
public Task<ConsistencyMessage> GetFirstEnqueuedMessageAsync(CancellationToken cancellationToken)
public Task<OperateResult> UpdateReceivedMessageAsync(CapReceivedMessage message)
{
throw new NotImplementedException();
}
public Task<OperateResult> UpdateAsync(ConsistencyMessage message, CancellationToken cancellationToken)
public Task<OperateResult> UpdateSentMessageAsync(CapSentMessage message)
{
throw new NotImplementedException();
}
......
......@@ -7,42 +7,32 @@ namespace DotNetCore.CAP.Test
{
public class NoopMessageStore : ICapMessageStore
{
public Task<OperateResult> CreateAsync(ConsistencyMessage message, CancellationToken cancellationToken)
public Task<CapSentMessage> GetNextSentMessageToBeEnqueuedAsync()
{
throw new NotImplementedException();
}
public Task<OperateResult> DeleteAsync(ConsistencyMessage message, CancellationToken cancellationToken)
public Task<OperateResult> RemoveSentMessageAsync(CapSentMessage message)
{
throw new NotImplementedException();
}
public void Dispose()
public Task<OperateResult> StoreReceivedMessageAsync(CapReceivedMessage message)
{
throw new NotImplementedException();
}
public Task<ConsistencyMessage> FindByIdAsync(string messageId, CancellationToken cancellationToken)
public Task<OperateResult> StoreSentMessageAsync(CapSentMessage message)
{
throw new NotImplementedException();
}
public Task<string> GeConsistencyMessageIdAsync(ConsistencyMessage message, CancellationToken cancellationToken)
public Task<OperateResult> UpdateReceivedMessageAsync(CapReceivedMessage message)
{
throw new NotImplementedException();
}
public Task<ConsistencyMessage> GetFirstEnqueuedMessageAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
public Task<string> GetMessageIdAsync(ConsistencyMessage message, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
public Task<OperateResult> UpdateAsync(ConsistencyMessage message, CancellationToken cancellationToken)
public Task<OperateResult> UpdateSentMessageAsync(CapSentMessage message)
{
throw new NotImplementedException();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment