Commit 5d1c1bfb authored by Savorboard's avatar Savorboard

Architectural refactoring

parent 47b1dc5c
......@@ -68,6 +68,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.InMemoryStor
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.AzureServiceBus.InMemory", "samples\Sample.AzureServiceBus.InMemory\Sample.AzureServiceBus.InMemory.csproj", "{1E1E959C-3D0E-45C3-ABCA-DAAACE68AAB8}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.Dashboard", "src\DotNetCore.CAP.Dashboard\DotNetCore.CAP.Dashboard.csproj", "{56FB261C-67AF-4715-9A46-4FA4FAB91B2C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
......@@ -142,6 +144,10 @@ Global
{1E1E959C-3D0E-45C3-ABCA-DAAACE68AAB8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1E1E959C-3D0E-45C3-ABCA-DAAACE68AAB8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1E1E959C-3D0E-45C3-ABCA-DAAACE68AAB8}.Release|Any CPU.Build.0 = Release|Any CPU
{56FB261C-67AF-4715-9A46-4FA4FAB91B2C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{56FB261C-67AF-4715-9A46-4FA4FAB91B2C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{56FB261C-67AF-4715-9A46-4FA4FAB91B2C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{56FB261C-67AF-4715-9A46-4FA4FAB91B2C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......@@ -165,6 +171,7 @@ Global
{63B2A464-FBEA-42FB-8EFA-98AFA39FC920} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{58B6E829-C6C8-457C-9DD0-C600650254DF} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{1E1E959C-3D0E-45C3-ABCA-DAAACE68AAB8} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{56FB261C-67AF-4715-9A46-4FA4FAB91B2C} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}
......
......@@ -36,10 +36,10 @@ namespace Sample.RabbitMQ.MySql.Controllers
//your business code
connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction);
for (int i = 0; i < 5; i++)
{
//for (int i = 0; i < 5; i++)
//{
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
}
//}
transaction.Commit();
}
......@@ -68,10 +68,10 @@ namespace Sample.RabbitMQ.MySql.Controllers
}
[NonAction]
[CapSubscribe("#.rabbitmq.mysql")]
[CapSubscribe("sample.rabbitmq.mysql")]
public void Subscriber(DateTime time)
{
Console.WriteLine($@"{DateTime.Now}, Subscriber invoked, Sent time:{time}");
//Console.WriteLine($@"{DateTime.Now}, Subscriber invoked, Sent time:{time}");
}
}
}
......@@ -13,6 +13,7 @@ namespace Sample.RabbitMQ.MySql
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.UseUrls("http://*:15173")
.Build();
}
}
......@@ -21,7 +21,7 @@
"launchBrowser": true,
"launchUrl": "cap",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
"ASPNETCORE_ENVIRONMENT": "Production"
},
"applicationUrl": "http://localhost:57173/"
}
......
......@@ -15,12 +15,13 @@ namespace Sample.RabbitMQ.MySql
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseRabbitMQ("localhost");
x.UseRabbitMQ("192.168.2.120");
x.UseDashboard();
x.FailedRetryCount = 5;
x.FailedThresholdCallback = (type, name, content) =>
{
Console.WriteLine($@"A message of type {type} failed after executing {x.FailedRetryCount} several times, requiring manual troubleshooting. Message name: {name}, message body: {content}");
Console.WriteLine(
$@"A message of type {type} failed after executing {x.FailedRetryCount} several times, requiring manual troubleshooting. Message name: {name}, message body: {content}");
};
});
......
......@@ -2,7 +2,7 @@
"Logging": {
"IncludeScopes": false,
"LogLevel": {
"Default": "Debug"
"Default": "Error"
}
}
}
......@@ -24,7 +24,7 @@ namespace DotNetCore.CAP
services.Configure(_configure);
services.AddSingleton<IConsumerClientFactory, AzureServiceBusConsumerClientFactory>();
services.AddSingleton<IPublishExecutor, AzureServiceBusPublishMessageSender>();
services.AddSingleton<ITransportPublisher, AzureServiceBusPublishMessageSender>();
services.AddSingleton<IPublishMessageSender, AzureServiceBusPublishMessageSender>();
}
}
......
......@@ -19,7 +19,7 @@ namespace DotNetCore.CAP.Dashboard.Resources {
// class via a tool like ResGen or Visual Studio.
// To add or remove a member, edit your .ResX file then rerun ResGen
// with the /str option, or rebuild your VS project.
[global::System.CodeDom.Compiler.GeneratedCodeAttribute("System.Resources.Tools.StronglyTypedResourceBuilder", "15.0.0.0")]
[global::System.CodeDom.Compiler.GeneratedCodeAttribute("System.Resources.Tools.StronglyTypedResourceBuilder", "16.0.0.0")]
[global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
[global::System.Runtime.CompilerServices.CompilerGeneratedAttribute()]
public class Strings {
......
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<EmbeddedResource Include="Content\css\bootstrap.min.css" />
<EmbeddedResource Include="Content\css\cap.css" />
<EmbeddedResource Include="Content\css\jsonview.min.css" />
<EmbeddedResource Include="Content\css\rickshaw.min.css" />
<EmbeddedResource Include="Content\fonts\glyphicons-halflings-regular.eot" />
<EmbeddedResource Include="Content\fonts\glyphicons-halflings-regular.svg" />
<EmbeddedResource Include="Content\fonts\glyphicons-halflings-regular.ttf" />
<EmbeddedResource Include="Content\fonts\glyphicons-halflings-regular.woff" />
<EmbeddedResource Include="Content\fonts\glyphicons-halflings-regular.woff2" />
<EmbeddedResource Include="Content\js\bootstrap.min.js" />
<EmbeddedResource Include="Content\js\cap.js" />
<EmbeddedResource Include="Content\js\d3.layout.min.js" />
<EmbeddedResource Include="Content\js\d3.min.js" />
<EmbeddedResource Include="Content\js\jquery-2.1.4.min.js" />
<EmbeddedResource Include="Content\js\jsonview.min.js" />
<EmbeddedResource Include="Content\js\moment-with-locales.min.js" />
<EmbeddedResource Include="Content\js\moment.min.js" />
<EmbeddedResource Include="Content\js\rickshaw.min.js" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
<ItemGroup>
<Compile Update="Dashboard\Content\resx\Strings.Designer.cs">
<DesignTime>True</DesignTime>
<AutoGen>True</AutoGen>
<DependentUpon>Strings.resx</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\_SidebarMenu.generated.cs">
<DependentUpon>_SidebarMenu.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\SidebarMenu.cs">
<DependentUpon>_SidebarMenu.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\ReceivedPage.generated.cs">
<DependentUpon>ReceivedPage.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\ReceivedPage.cs">
<DependentUpon>ReceivedPage.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\_BlockMetric.generated.cs">
<DependentUpon>_BlockMetric.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\BlockMetric.cs">
<DependentUpon>_BlockMetric.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\Breadcrumbs.cs">
<DependentUpon>_Breadcrumbs.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\_Breadcrumbs.generated.cs">
<DependentUpon>_Breadcrumbs.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\_Paginator.generated.cs">
<DependentUpon>_Paginator.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\_Paginator.cs">
<DependentUpon>_Paginator.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\_PerPageSelector.cs">
<DependentUpon>_PerPageSelector.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\_PerPageSelector.generated.cs">
<DependentUpon>_PerPageSelector.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\PublishedPage.cs">
<DependentUpon>PublishedPage.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\PublishedPage*.cs">
<DependentUpon>PublishedPage.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\LayoutPage.*.cs">
<DependentUpon>LayoutPage.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\LayoutPage.cs">
<DependentUpon>LayoutPage.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\InlineMetric.cs">
<DependentUpon>_InlineMetric.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\_InlineMetric.generated.cs">
<DependentUpon>_InlineMetric.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\_Navigation.generated.cs">
<DependentUpon>_Navigation.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\HomePage.generated.cs">
<DependentUpon>HomePage.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\HomePage.cs">
<DependentUpon>HomePage.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\SubscriberPage.generated.cs">
<DependentUpon>SubscriberPage.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\NodePage*.cs">
<DependentUpon>NodePage.cshtml</DependentUpon>
</Compile>
</ItemGroup>
<ItemGroup>
<EmbeddedResource Update="Content\resx\Strings.resx">
<CustomToolNamespace>DotNetCore.CAP.Dashboard.Resources</CustomToolNamespace>
<LastGenOutput>Strings.Designer.cs</LastGenOutput>
<Generator>PublicResXFileCodeGenerator</Generator>
</EmbeddedResource>
</ItemGroup>
<ItemGroup>
<EmbeddedResource Update="Dashboard\Content\resx\Strings.resx">
<Generator>PublicResXFileCodeGenerator</Generator>
<CustomToolNamespace>DotNetCore.CAP.Dashboard.Resources</CustomToolNamespace>
<LastGenOutput>Strings.Designer.cs</LastGenOutput>
</EmbeddedResource>
</ItemGroup>
</Project>
......@@ -11,7 +11,7 @@ using System.Text.RegularExpressions;
using DotNetCore.CAP.Dashboard.Pages;
using DotNetCore.CAP.Dashboard.Resources;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.Internal;
namespace DotNetCore.CAP.Dashboard
......
......@@ -4,7 +4,7 @@
using System;
using System.Collections.Generic;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.Dashboard
{
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.Dashboard.Monitoring
{
......
@* Generator: Template TypeVisibility: Internal GeneratePrettyNames: True *@
@using DotNetCore.CAP.Dashboard.Pages
@using DotNetCore.CAP.Dashboard.Resources
@using DotNetCore.CAP.Models
@using DotNetCore.CAP.Messages
@using Newtonsoft.Json
@inherits DotNetCore.CAP.Dashboard.RazorPage
@{
......
#pragma warning disable 1591
using DotNetCore.CAP.Messages;
#pragma warning disable 1591
//------------------------------------------------------------------------------
// <auto-generated>
// This code was generated by a tool.
......@@ -45,8 +47,6 @@ namespace DotNetCore.CAP.Dashboard.Pages
#line hidden
#line 4 "..\..\HomePage.cshtml"
using DotNetCore.CAP.Models;
#line default
#line hidden
......
......@@ -4,7 +4,7 @@
@using DotNetCore.CAP.Dashboard.Monitoring
@using DotNetCore.CAP.Dashboard.Pages
@using DotNetCore.CAP.Dashboard.Resources
@using DotNetCore.CAP.Models
@using DotNetCore.CAP.Messages
@inherits DotNetCore.CAP.Dashboard.RazorPage
@{
Layout = new LayoutPage(Strings.PublishedMessagesPage_Title);
......
#pragma warning disable 1591
using DotNetCore.CAP.Messages;
#pragma warning disable 1591
//------------------------------------------------------------------------------
// <auto-generated>
// This code was generated by a tool.
......@@ -46,7 +48,6 @@ namespace DotNetCore.CAP.Dashboard.Pages
#line hidden
#line 3 "..\..\PublishedPage.cshtml"
using DotNetCore.CAP.Models;
#line default
#line hidden
......
......@@ -4,7 +4,7 @@
@using DotNetCore.CAP.Dashboard.Monitoring
@using DotNetCore.CAP.Dashboard.Pages
@using DotNetCore.CAP.Dashboard.Resources
@using DotNetCore.CAP.Models
@using DotNetCore.CAP.Messages
@inherits DotNetCore.CAP.Dashboard.RazorPage
@{
Layout = new LayoutPage(Strings.ReceivedMessagesPage_Title);
......
#pragma warning disable 1591
using DotNetCore.CAP.Messages;
#pragma warning disable 1591
//------------------------------------------------------------------------------
// <auto-generated>
// This code was generated by a tool.
......@@ -46,7 +48,6 @@ namespace DotNetCore.CAP.Dashboard.Pages
#line hidden
#line 3 "..\..\ReceivedPage.cshtml"
using DotNetCore.CAP.Models;
#line default
#line hidden
......
......@@ -5,7 +5,7 @@ using System;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.InMemoryStorage
......
......@@ -7,7 +7,7 @@ using System.Linq;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.InMemoryStorage
{
......
......@@ -7,7 +7,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.InMemoryStorage
......
......@@ -4,7 +4,7 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.InMemoryStorage
{
......
......@@ -24,7 +24,7 @@ namespace DotNetCore.CAP
services.Configure(_configure);
services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();
services.AddSingleton<IPublishExecutor, KafkaPublishMessageSender>();
services.AddSingleton<ITransportPublisher, KafkaPublishMessageSender>();
services.AddSingleton<IPublishMessageSender, KafkaPublishMessageSender>();
services.AddSingleton<IConnectionPool,ConnectionPool>();
}
......
......@@ -5,7 +5,7 @@ using System;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
......
......@@ -6,7 +6,7 @@ using System.Collections.Generic;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Driver;
......
......@@ -5,7 +5,7 @@ using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
......
......@@ -3,7 +3,7 @@
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
......
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.MongoDB
{
......
......@@ -3,7 +3,6 @@
using System;
using DotNetCore.CAP.MySql;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
......@@ -22,12 +21,7 @@ namespace DotNetCore.CAP
public void AddServices(IServiceCollection services)
{
services.AddSingleton<CapStorageMarkerService>();
services.AddSingleton<IStorage, MySqlStorage>();
services.AddSingleton<IStorageConnection, MySqlStorageConnection>();
services.AddSingleton<ICapPublisher, MySqlPublisher>();
services.AddSingleton<ICallbackPublisher>(provider => (MySqlPublisher)provider.GetService<ICapPublisher>());
services.AddSingleton<ICollectProcessor, MySqlCollectProcessor>();
//services.AddSingleton<IStorageConnection, MySqlStorageConnection>();
services.AddTransient<CapTransactionBase, MySqlCapTransaction>();
//Add MySqlOptions
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
public class MySqlPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly MySqlOptions _options;
public MySqlPublisher(IServiceProvider provider) : base(provider)
{
_options = provider.GetService<IOptions<MySqlOptions>>().Value;
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
{
await PublishAsyncInternal(message);
}
protected override async Task ExecuteAsync(CapPublishedMessage message,
ICapTransaction transaction = null,
CancellationToken cancel = default(CancellationToken))
{
if (transaction == null)
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
await connection.ExecuteAsync(PrepareSql(), message);
return;
}
}
var dbTrans = transaction.DbTransaction as IDbTransaction;
if (dbTrans == null && transaction.DbTransaction is IDbContextTransaction dbContextTrans)
{
dbTrans = dbContextTrans.GetDbTransaction();
}
var conn = dbTrans?.Connection;
await conn.ExecuteAsync(PrepareSql(), message, dbTrans);
}
#region private methods
private string PrepareSql()
{
return
$"INSERT INTO `{_options.TableNamePrefix}.published` (`Id`,`Version`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)" +
$"VALUES(@Id,'{_options.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
#endregion private methods
}
}
\ No newline at end of file
......@@ -3,7 +3,6 @@
using System.Data;
using System.Diagnostics;
using System.Threading;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using Dapper;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.MySql
{
internal class MySqlMonitoringApi : IMonitoringApi
{
private readonly string _prefix;
private readonly MySqlStorage _storage;
public MySqlMonitoringApi(IStorage storage, IOptions<MySqlOptions> options)
{
_storage = storage as MySqlStorage ?? throw new ArgumentNullException(nameof(storage));
_prefix = options.Value.TableNamePrefix ?? throw new ArgumentNullException(nameof(options));
}
public StatisticsDto GetStatistics()
{
var sql = string.Format(@"
set transaction isolation level read committed;
select count(Id) from `{0}.published` where StatusName = N'Succeeded';
select count(Id) from `{0}.received` where StatusName = N'Succeeded';
select count(Id) from `{0}.published` where StatusName = N'Failed';
select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix);
var statistics = UseConnection(connection =>
{
var stats = new StatisticsDto();
using (var multi = connection.QueryMultiple(sql))
{
stats.PublishedSucceeded = multi.ReadSingle<int>();
stats.ReceivedSucceeded = multi.ReadSingle<int>();
stats.PublishedFailed = multi.ReadSingle<int>();
stats.ReceivedFailed = multi.ReadSingle<int>();
}
return stats;
});
return statistics;
}
public IDictionary<DateTime, int> HourlyFailedJobs(MessageType type)
{
var tableName = type == MessageType.Publish ? "published" : "received";
return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, StatusName.Failed));
}
public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type)
{
var tableName = type == MessageType.Publish ? "published" : "received";
return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, StatusName.Succeeded));
}
public IList<MessageDto> Messages(MessageQueryDto queryDto)
{
var tableName = queryDto.MessageType == MessageType.Publish ? "published" : "received";
var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName))
{
where += " and StatusName=@StatusName";
}
if (!string.IsNullOrEmpty(queryDto.Name))
{
where += " and Name=@Name";
}
if (!string.IsNullOrEmpty(queryDto.Group))
{
where += " and `Group`=@Group";
}
if (!string.IsNullOrEmpty(queryDto.Content))
{
where += " and Content like '%@Content%'";
}
var sqlQuery =
$"select * from `{_prefix}.{tableName}` where 1=1 {where} order by Added desc limit @Limit offset @Offset";
return UseConnection(conn => conn.Query<MessageDto>(sqlQuery, new
{
queryDto.StatusName,
queryDto.Group,
queryDto.Name,
queryDto.Content,
Offset = queryDto.CurrentPage * queryDto.PageSize,
Limit = queryDto.PageSize
}).ToList());
}
public int PublishedFailedCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Failed));
}
public int PublishedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Succeeded));
}
public int ReceivedFailedCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Failed));
}
public int ReceivedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Succeeded));
}
private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
{
var sqlQuery = $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state";
var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName });
return count;
}
private T UseConnection<T>(Func<IDbConnection, T> action)
{
return _storage.UseConnection(action);
}
private Dictionary<DateTime, int> GetHourlyTimelineStats(IDbConnection connection, string tableName,
string statusName)
{
var endDate = DateTime.Now;
var dates = new List<DateTime>();
for (var i = 0; i < 24; i++)
{
dates.Add(endDate);
endDate = endDate.AddHours(-1);
}
var keyMaps = dates.ToDictionary(x => x.ToString("yyyy-MM-dd-HH"), x => x);
return GetTimelineStats(connection, tableName, statusName, keyMaps);
}
private Dictionary<DateTime, int> GetTimelineStats(
IDbConnection connection,
string tableName,
string statusName,
IDictionary<string, DateTime> keyMaps)
{
var sqlQuery =
$@"
select aggr.* from (
select date_format(`Added`,'%Y-%m-%d-%H') as `Key`,
count(id) `Count`
from `{_prefix}.{tableName}`
where StatusName = @statusName
group by date_format(`Added`,'%Y-%m-%d-%H')
) aggr where `Key` in @keys;";
var valuesMap = connection.Query<TimelineCounter>(
sqlQuery,
new { keys = keyMaps.Keys, statusName })
.ToDictionary(x => x.Key, x => x.Count);
foreach (var key in keyMaps.Keys)
{
if (!valuesMap.ContainsKey(key))
{
valuesMap.Add(key, 0);
}
}
var result = new Dictionary<DateTime, int>();
for (var i = 0; i < keyMaps.Count; i++)
{
var value = valuesMap[keyMaps.ElementAt(i).Key];
result.Add(keyMaps.ElementAt(i).Value, value);
}
return result;
}
}
}
\ No newline at end of file
//// Copyright (c) .NET Core Community. All rights reserved.
//// Licensed under the MIT License. See License.txt in the project root for license information.
//using System;
//using System.Collections.Generic;
//using System.Data;
//using System.Linq;
//using Dapper;
//using DotNetCore.CAP.Dashboard;
//using DotNetCore.CAP.Dashboard.Monitoring;
//using DotNetCore.CAP.Infrastructure;
//using DotNetCore.CAP.Messages;
//using Microsoft.Extensions.Options;
//using MySql.Data.MySqlClient;
//namespace DotNetCore.CAP.MySql
//{
// internal class MySqlMonitoringApi : IMonitoringApi
// {
// private readonly IDbConnection _existingConnection = null;
// private readonly string _prefix;
// private readonly string _connectionString;
// public MySqlMonitoringApi(IOptions<MySqlOptions> options)
// {
// _prefix = options.Value.TableNamePrefix ?? throw new ArgumentNullException(nameof(options));
// _connectionString = options.Value.ConnectionString;
// }
// public StatisticsDto GetStatistics()
// {
// var sql = string.Format(@"
//set transaction isolation level read committed;
//select count(Id) from `{0}.published` where StatusName = N'Succeeded';
//select count(Id) from `{0}.received` where StatusName = N'Succeeded';
//select count(Id) from `{0}.published` where StatusName = N'Failed';
//select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix);
// var statistics = UseConnection(connection =>
// {
// var stats = new StatisticsDto();
// using (var multi = connection.QueryMultiple(sql))
// {
// stats.PublishedSucceeded = multi.ReadSingle<int>();
// stats.ReceivedSucceeded = multi.ReadSingle<int>();
// stats.PublishedFailed = multi.ReadSingle<int>();
// stats.ReceivedFailed = multi.ReadSingle<int>();
// }
// return stats;
// });
// return statistics;
// }
// public IDictionary<DateTime, int> HourlyFailedJobs(MessageType type)
// {
// var tableName = type == MessageType.Publish ? "published" : "received";
// return UseConnection(connection =>
// GetHourlyTimelineStats(connection, tableName, StatusName.Failed));
// }
// public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type)
// {
// var tableName = type == MessageType.Publish ? "published" : "received";
// return UseConnection(connection =>
// GetHourlyTimelineStats(connection, tableName, StatusName.Succeeded));
// }
// public IList<MessageDto> Messages(MessageQueryDto queryDto)
// {
// var tableName = queryDto.MessageType == MessageType.Publish ? "published" : "received";
// var where = string.Empty;
// if (!string.IsNullOrEmpty(queryDto.StatusName))
// {
// where += " and StatusName=@StatusName";
// }
// if (!string.IsNullOrEmpty(queryDto.Name))
// {
// where += " and Name=@Name";
// }
// if (!string.IsNullOrEmpty(queryDto.Group))
// {
// where += " and `Group`=@Group";
// }
// if (!string.IsNullOrEmpty(queryDto.Content))
// {
// where += " and Content like '%@Content%'";
// }
// var sqlQuery =
// $"select * from `{_prefix}.{tableName}` where 1=1 {where} order by Added desc limit @Limit offset @Offset";
// return UseConnection(conn => conn.Query<MessageDto>(sqlQuery, new
// {
// queryDto.StatusName,
// queryDto.Group,
// queryDto.Name,
// queryDto.Content,
// Offset = queryDto.CurrentPage * queryDto.PageSize,
// Limit = queryDto.PageSize
// }).ToList());
// }
// public int PublishedFailedCount()
// {
// return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Failed));
// }
// public int PublishedSucceededCount()
// {
// return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Succeeded));
// }
// public int ReceivedFailedCount()
// {
// return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Failed));
// }
// public int ReceivedSucceededCount()
// {
// return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Succeeded));
// }
// private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
// {
// var sqlQuery = $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state";
// var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName });
// return count;
// }
// private Dictionary<DateTime, int> GetHourlyTimelineStats(IDbConnection connection, string tableName,
// string statusName)
// {
// var endDate = DateTime.Now;
// var dates = new List<DateTime>();
// for (var i = 0; i < 24; i++)
// {
// dates.Add(endDate);
// endDate = endDate.AddHours(-1);
// }
// var keyMaps = dates.ToDictionary(x => x.ToString("yyyy-MM-dd-HH"), x => x);
// return GetTimelineStats(connection, tableName, statusName, keyMaps);
// }
// private Dictionary<DateTime, int> GetTimelineStats(
// IDbConnection connection,
// string tableName,
// string statusName,
// IDictionary<string, DateTime> keyMaps)
// {
// var sqlQuery =
// $@"
//select aggr.* from (
// select date_format(`Added`,'%Y-%m-%d-%H') as `Key`,
// count(id) `Count`
// from `{_prefix}.{tableName}`
// where StatusName = @statusName
// group by date_format(`Added`,'%Y-%m-%d-%H')
//) aggr where `Key` in @keys;";
// var valuesMap = connection.Query<TimelineCounter>(
// sqlQuery,
// new { keys = keyMaps.Keys, statusName })
// .ToDictionary(x => x.Key, x => x.Count);
// foreach (var key in keyMaps.Keys)
// {
// if (!valuesMap.ContainsKey(key))
// {
// valuesMap.Add(key, 0);
// }
// }
// var result = new Dictionary<DateTime, int>();
// for (var i = 0; i < keyMaps.Count; i++)
// {
// var value = valuesMap[keyMaps.ElementAt(i).Key];
// result.Add(keyMaps.ElementAt(i).Value, value);
// }
// return result;
// }
// private T UseConnection<T>(Func<IDbConnection, T> func)
// {
// IDbConnection connection = null;
// try
// {
// connection = CreateAndOpenConnection();
// return func(connection);
// }
// finally
// {
// ReleaseConnection(connection);
// }
// }
// private IDbConnection CreateAndOpenConnection()
// {
// var connection = _existingConnection ?? new MySqlConnection(_connectionString);
// if (connection.State == ConnectionState.Closed)
// {
// connection.Open();
// }
// return connection;
// }
// private bool IsExistingConnection(IDbConnection connection)
// {
// return connection != null && ReferenceEquals(connection, _existingConnection);
// }
// private void ReleaseConnection(IDbConnection connection)
// {
// if (connection != null && !IsExistingConnection(connection))
// {
// connection.Dispose();
// }
// }
// }
//}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Persistence;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
public class MySqlStorage : IStorage
public class MySqlStorageInitializer : IStorageInitializer
{
private readonly IOptions<CapOptions> _capOptions;
private readonly IOptions<MySqlOptions> _options;
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
public MySqlStorage(
ILogger<MySqlStorage> logger,
public MySqlStorageInitializer(
ILogger<MySqlStorageInitializer> logger,
IOptions<MySqlOptions> options,
IOptions<CapOptions> capOptions)
{
_options = options;
_capOptions = capOptions;
_logger = logger;
}
public IStorageConnection GetConnection()
public string GetPublishedTableName()
{
return new MySqlStorageConnection(_options, _capOptions);
return $"{_options.Value.TableNamePrefix}.published";
}
public IMonitoringApi GetMonitoringApi()
public string GetReceivedTableName()
{
return new MySqlMonitoringApi(this, _options);
return $"{_options.Value.TableNamePrefix}.received";
}
public async Task InitializeAsync(CancellationToken cancellationToken)
......@@ -56,6 +51,7 @@ namespace DotNetCore.CAP.MySql
_logger.LogDebug("Ensuring all create database tables script are applied.");
}
protected virtual string CreateDbTablesScript(string prefix)
{
var batchSql =
......@@ -87,45 +83,5 @@ CREATE TABLE IF NOT EXISTS `{prefix}.published` (
";
return batchSql;
}
internal T UseConnection<T>(Func<IDbConnection, T> func)
{
IDbConnection connection = null;
try
{
connection = CreateAndOpenConnection();
return func(connection);
}
finally
{
ReleaseConnection(connection);
}
}
internal IDbConnection CreateAndOpenConnection()
{
var connection = _existingConnection ?? new MySqlConnection(_options.Value.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
connection.Open();
}
return connection;
}
internal bool IsExistingConnection(IDbConnection connection)
{
return connection != null && ReferenceEquals(connection, _existingConnection);
}
internal void ReleaseConnection(IDbConnection connection)
{
if (connection != null && !IsExistingConnection(connection))
{
connection.Dispose();
}
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.Options;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
public class MySqlStorageConnection : IStorageConnection
{
private readonly CapOptions _capOptions;
private readonly IOptions<MySqlOptions> _options;
private readonly string _prefix;
public MySqlStorageConnection(IOptions<MySqlOptions> options, IOptions<CapOptions> capOptions)
{
_options = options;
_capOptions = capOptions.Value;
_prefix = options.Value.TableNamePrefix;
}
public MySqlOptions Options => _options.Value;
public IStorageTransaction CreateTransaction()
{
return new MySqlStorageTransaction(this);
}
public async Task<CapPublishedMessage> GetPublishedMessageAsync(long id)
{
var sql = $@"SELECT * FROM `{_prefix}.published` WHERE `Id`={id};";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
}
public async Task<IEnumerable<CapPublishedMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM `{_prefix}.published` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Version`='{_capOptions.Version}' AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}
public void StoreReceivedMessage(CapReceivedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql = $@"
INSERT INTO `{_prefix}.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)
VALUES(@Id,'{_capOptions.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
connection.Execute(sql, message);
}
}
public async Task<CapReceivedMessage> GetReceivedMessageAsync(long id)
{
var sql = $@"SELECT * FROM `{_prefix}.received` WHERE Id={id};";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public async Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM `{_prefix}.received` WHERE `Retries`<{_capOptions.FailedRetryCount} AND `Version`='{_capOptions.Version}' AND `Added`<'{fourMinsAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
}
}
public bool ChangePublishedState(long messageId, string state)
{
var sql =
$"UPDATE `{_prefix}.published` SET `Retries`=`Retries`+1,`ExpiresAt`=NULL,`StatusName` = '{state}' WHERE `Id`={messageId}";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
return connection.Execute(sql) > 0;
}
}
public bool ChangeReceivedState(long messageId, string state)
{
var sql =
$"UPDATE `{_prefix}.received` SET `Retries`=`Retries`+1,`ExpiresAt`=NULL,`StatusName` = '{state}' WHERE `Id`={messageId}";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
return connection.Execute(sql) > 0;
}
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Models;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
public class MySqlStorageTransaction : IStorageTransaction
{
private readonly IDbConnection _dbConnection;
private readonly string _prefix;
public MySqlStorageTransaction(MySqlStorageConnection connection)
{
var options = connection.Options;
_prefix = options.TableNamePrefix;
_dbConnection = new MySqlConnection(options.ConnectionString);
}
public void UpdateMessage(CapPublishedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$"UPDATE `{_prefix}.published` SET `Retries` = @Retries,`Content`= @Content,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
_dbConnection.Execute(sql, message);
}
public void UpdateMessage(CapReceivedMessage message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var sql =
$"UPDATE `{_prefix}.received` SET `Retries` = @Retries,`Content`= @Content,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
_dbConnection.Execute(sql, message);
}
public Task CommitAsync()
{
_dbConnection.Close();
_dbConnection.Dispose();
return Task.CompletedTask;
}
public void Dispose()
{
_dbConnection.Dispose();
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Serialization;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Options;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
public class MySqlDataStorage : IDataStorage
{
private readonly IOptions<MySqlOptions> _options;
private readonly IOptions<CapOptions> _capOptions;
public MySqlDataStorage(IOptions<MySqlOptions> options, IOptions<CapOptions> capOptions)
{
_options = options;
_capOptions = capOptions;
}
public async Task ChangePublishStateAsync(MediumMessage message, StatusName state)
{
using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{
var sql = $"UPDATE `{_options.Value.TableNamePrefix}.published` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
await connection.ExecuteAsync(sql, new
{
Id = message.DbId,
Retries = message.Retries,
ExpiresAt = message.ExpiresAt,
StatusName = state.ToString("G")
});
}
}
public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state)
{
using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{
var sql = $"UPDATE `{_options.Value.TableNamePrefix}.received` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
await connection.ExecuteAsync(sql, new
{
Id = message.DbId,
Retries = message.Retries,
ExpiresAt = message.ExpiresAt,
StatusName = state.ToString("G")
});
}
}
public async Task<MediumMessage> StoreMessageAsync(string name, Message content, object dbTransaction = null, CancellationToken cancellationToken = default)
{
var sql = $"INSERT INTO `{_options.Value.TableNamePrefix}.published`(`Id`,`Version`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var message = new MediumMessage()
{
DbId = content.GetId(),
Origin = content,
Added = DateTime.Now,
ExpiresAt = null,
Retries = 0
};
var po = new
{
Id = message.DbId,
Name = name,
Content = StringSerializer.Serialize(message.Origin),
Retries = message.Retries,
Added = message.Added,
ExpiresAt = message.ExpiresAt,
StatusName = StatusName.Scheduled
};
if (dbTransaction == null)
{
using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{
await connection.ExecuteAsync(sql, po);
}
}
else
{
var dbTrans = dbTransaction as IDbTransaction;
if (dbTrans == null && dbTransaction is IDbContextTransaction dbContextTrans)
{
dbTrans = dbContextTrans.GetDbTransaction();
}
var conn = dbTrans?.Connection;
await conn.ExecuteAsync(sql, po, dbTrans);
}
return message;
}
public async Task<MediumMessage> StoreMessageAsync(string name, string group, Message content, CancellationToken cancellationToken = default)
{
var sql = $@"INSERT INTO `{_options.Value.TableNamePrefix}.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var message = new MediumMessage()
{
DbId = SnowflakeId.Default().NextId().ToString(),
Origin = content,
Added = DateTime.Now,
ExpiresAt = null,
Retries = 0
};
var po = new
{
Id = message.DbId,
Group = group,
Name = name,
Content = StringSerializer.Serialize(message.Origin),
Retries = message.Retries,
Added = message.Added,
ExpiresAt = message.ExpiresAt,
StatusName = StatusName.Scheduled
};
using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{
await connection.ExecuteAsync(sql, po);
}
return message;
}
public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default)
{
using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{
return await connection.ExecuteAsync(
$@"DELETE FROM `{table}` WHERE ExpiresAt < @timeout limit @batchCount;",
new { timeout, batchCount });
}
}
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql = $"SELECT * FROM `{_options.Value.TableNamePrefix}.published` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
var result = new List<MediumMessage>();
using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{
var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read())
{
result.Add(new MediumMessage()
{
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(3)),
Retries = reader.GetInt32(4),
Added = reader.GetDateTime(5)
});
}
}
return result;
}
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM `{_options.Value.TableNamePrefix}.received` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
var result = new List<MediumMessage>();
using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{
var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read())
{
result.Add(new MediumMessage()
{
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(3)),
Retries = reader.GetInt32(4),
Added = reader.GetDateTime(5)
});
}
}
return result;
}
//public Task<CapPublishedMessage> GetPublishedMessageAsync(long id)
//{
// var sql = $@"SELECT * FROM `{_prefix}.published` WHERE `Id`={id};";
// using (var connection = new MySqlConnection(Options.ConnectionString))
// {
// return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
// }
//}
//public Task<CapReceivedMessage> GetReceivedMessageAsync(long id)
//{
// var sql =
// $@"SELECT * FROM `{_prefix}.received` WHERE Id={id};";
// using (var connection = new MySqlConnection(Options.ConnectionString))
// {
// return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
// }
//}
}
}
......@@ -7,7 +7,7 @@ using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
......
......@@ -9,7 +9,7 @@ using Dapper;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.PostgreSql
......
......@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.Options;
using Npgsql;
......
......@@ -5,7 +5,7 @@ using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
......
......@@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.RabbitMQ;
using Microsoft.Extensions.DependencyInjection;
......@@ -24,8 +25,7 @@ namespace DotNetCore.CAP
services.Configure(_configure);
services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>();
services.AddSingleton<IConnectionChannelPool, ConnectionChannelPool>();
services.AddSingleton<IPublishExecutor, RabbitMQPublishMessageSender>();
services.AddSingleton<IPublishMessageSender, RabbitMQPublishMessageSender>();
}
}
}
\ No newline at end of file
......@@ -2,53 +2,50 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Text;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Framing;
namespace DotNetCore.CAP.RabbitMQ
{
internal sealed class RabbitMQPublishMessageSender : BasePublishMessageSender
internal sealed class RabbitMQMessageSender : ITransport
{
private readonly IConnectionChannelPool _connectionChannelPool;
private readonly ILogger _logger;
private readonly string _exchange;
public RabbitMQPublishMessageSender(
ILogger<RabbitMQPublishMessageSender> logger,
IOptions<CapOptions> options,
IStorageConnection connection,
IConnectionChannelPool connectionChannelPool,
IStateChanger stateChanger)
: base(logger, options, connection, stateChanger)
public RabbitMQMessageSender(
ILogger<RabbitMQMessageSender> logger,
IConnectionChannelPool connectionChannelPool)
{
_logger = logger;
_connectionChannelPool = connectionChannelPool;
_exchange = _connectionChannelPool.Exchange;
}
protected override string ServersAddress => _connectionChannelPool.HostAddress;
public string Address => _connectionChannelPool.HostAddress;
public override Task<OperateResult> PublishAsync(string keyName, string content)
public Task<OperateResult> SendAsync(TransportMessage message)
{
var channel = _connectionChannelPool.Rent();
try
{
var body = Encoding.UTF8.GetBytes(content);
var props = new BasicProperties()
var props = new BasicProperties
{
DeliveryMode = 2
DeliveryMode = 2,
Headers = message.Headers.ToDictionary(x => x.Key, x => (object)x.Value)
};
channel.ExchangeDeclare(_exchange, RabbitMQOptions.ExchangeType, true);
channel.BasicPublish(_exchange, keyName, props, body);
_logger.LogDebug($"RabbitMQ topic message [{keyName}] has been published. Body: {content}");
channel.BasicPublish(_exchange, message.GetName(), props, message.Body);
_logger.LogDebug($"RabbitMQ topic message [{message.GetName()}] has been published.");
return Task.FromResult(OperateResult.Success);
}
......
......@@ -3,11 +3,13 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Linq;
using System.Threading;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Headers = DotNetCore.CAP.Messages.Headers;
namespace DotNetCore.CAP.RabbitMQ
{
......@@ -34,7 +36,7 @@ namespace DotNetCore.CAP.RabbitMQ
_exchangeName = connectionChannelPool.Exchange;
}
public event EventHandler<MessageContext> OnMessageReceived;
public event EventHandler<TransportMessage> OnMessageReceived;
public event EventHandler<LogMessageEventArgs> OnLog;
......@@ -160,12 +162,13 @@ namespace DotNetCore.CAP.RabbitMQ
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e)
{
_deliveryTag = e.DeliveryTag;
var message = new MessageContext
{
Group = _queueName,
Name = e.RoutingKey,
Content = Encoding.UTF8.GetString(e.Body)
};
var header = e.BasicProperties.Headers
.ToDictionary(x => x.Key, x => x.Value.ToString());
header.Add(Headers.Group, _queueName);
var message = new TransportMessage(header, e.Body);
OnMessageReceived?.Invoke(sender, message);
}
......@@ -176,6 +179,7 @@ namespace DotNetCore.CAP.RabbitMQ
LogType = MqLogType.ConsumerShutdown,
Reason = e.ReplyText
};
OnLog?.Invoke(sender, args);
}
......
......@@ -6,7 +6,7 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Reflection;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.SqlServer.Diagnostics
{
......
......@@ -5,7 +5,7 @@ using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.SqlServer.Diagnostics
{
......
......@@ -8,7 +8,7 @@ using System.Threading;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
......
......@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.SqlServer.Diagnostics;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
......
......@@ -9,7 +9,7 @@ using Dapper;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.SqlServer
......
......@@ -7,7 +7,7 @@ using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.SqlServer
......
......@@ -6,7 +6,7 @@ using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.SqlServer
{
......
......@@ -2,32 +2,32 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Persistence;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.Abstractions
{
public abstract class CapPublisherBase : ICapPublisher
public class CapPublisher : ICapPublisher
{
private readonly IMessagePacker _msgPacker;
private readonly IContentSerializer _serializer;
private readonly IDispatcher _dispatcher;
private readonly IDataStorage _storage;
// ReSharper disable once InconsistentNaming
protected static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);
protected CapPublisherBase(IServiceProvider service)
protected CapPublisher(IServiceProvider service)
{
ServiceProvider = service;
_dispatcher = service.GetRequiredService<IDispatcher>();
_msgPacker = service.GetRequiredService<IMessagePacker>();
_serializer = service.GetRequiredService<IContentSerializer>();
_storage = service.GetRequiredService<IDataStorage>();
Transaction = new AsyncLocal<ICapTransaction>();
}
......@@ -35,58 +35,56 @@ namespace DotNetCore.CAP.Abstractions
public AsyncLocal<ICapTransaction> Transaction { get; }
public void Publish<T>(string name, T contentObj, string callbackName = null)
public async Task PublishAsync<T>(string name, T value,
IDictionary<string, string> optionHeaders,
CancellationToken cancellationToken = default)
{
var message = new CapPublishedMessage
if (string.IsNullOrEmpty(name))
{
Id = SnowflakeId.Default().NextId(),
Name = name,
Content = Serialize(contentObj, callbackName),
StatusName = StatusName.Scheduled
};
PublishAsyncInternal(message).GetAwaiter().GetResult();
throw new ArgumentNullException(nameof(name));
}
public async Task PublishAsync<T>(string name, T contentObj, string callbackName = null,
CancellationToken cancellationToken = default(CancellationToken))
{
var message = new CapPublishedMessage
if (optionHeaders == null)
{
Id = SnowflakeId.Default().NextId(),
Name = name,
Content = Serialize(contentObj, callbackName),
StatusName = StatusName.Scheduled
};
await PublishAsyncInternal(message);
optionHeaders = new Dictionary<string, string>();
}
protected async Task PublishAsyncInternal(CapPublishedMessage message)
var messageId = SnowflakeId.Default().NextId().ToString();
optionHeaders.Add(Headers.MessageId, messageId);
if (!optionHeaders.ContainsKey(Headers.CorrelationId))
{
var operationId = default(Guid);
optionHeaders.Add(Headers.CorrelationId, messageId);
optionHeaders.Add(Headers.CorrelationSequence, 0.ToString());
}
optionHeaders.Add(Headers.MessageName, name);
optionHeaders.Add(Headers.Type, typeof(T).ToString());
optionHeaders.Add(Headers.SentTime, DateTimeOffset.Now.ToString());
var message = new Message(optionHeaders, value);
var operationId = default(Guid);
try
{
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);
if (Transaction.Value?.DbTransaction == null)
{
await ExecuteAsync(message);
var mediumMessage = await _storage.StoreMessageAsync(name, message, cancellationToken: cancellationToken);
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
_dispatcher.EnqueueToPublish(message);
_dispatcher.EnqueueToPublish(mediumMessage);
}
else
{
var transaction = (CapTransactionBase)Transaction.Value;
await ExecuteAsync(message, transaction);
var mediumMessage = await _storage.StoreMessageAsync(name, message, transaction, cancellationToken);
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
transaction.AddToSent(message);
transaction.AddToSent(mediumMessage);
if (transaction.AutoCommit)
{
transaction.Commit();
......@@ -96,33 +94,24 @@ namespace DotNetCore.CAP.Abstractions
catch (Exception e)
{
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
throw;
}
}
protected abstract Task ExecuteAsync(CapPublishedMessage message,
ICapTransaction transaction = null,
CancellationToken cancel = default(CancellationToken));
protected virtual string Serialize<T>(T obj, string callbackName = null)
{
string content;
if (obj != null)
public void Publish<T>(string name, T value, string callbackName = null)
{
content = Helper.IsComplexType(obj.GetType())
? _serializer.Serialize(obj)
: obj.ToString();
PublishAsync(name, value, callbackName).GetAwaiter().GetResult();
}
else
public Task PublishAsync<T>(string name, T value, string callbackName = null,
CancellationToken cancellationToken = default)
{
content = string.Empty;
}
var message = new CapMessageDto(content)
var header = new Dictionary<string, string>
{
CallbackName = callbackName
{Headers.CallbackName, callbackName}
};
return _msgPacker.Pack(message);
return PublishAsync(name, value, header, cancellationToken);
}
}
}
\ No newline at end of file
......@@ -2,7 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.Abstractions
{
......
......@@ -3,8 +3,6 @@
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Dashboard.GatewayProxy;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
......@@ -20,29 +18,29 @@ namespace Microsoft.AspNetCore.Builder
/// </summary>
/// <param name="app">The <see cref="IApplicationBuilder" /> instance this method extends.</param>
/// <returns>The <see cref="IApplicationBuilder" /> instance this method extends.</returns>
public static IApplicationBuilder UseCapDashboard(this IApplicationBuilder app)
{
if (app == null)
{
throw new ArgumentNullException(nameof(app));
}
//public static IApplicationBuilder UseCapDashboard(this IApplicationBuilder app)
//{
// if (app == null)
// {
// throw new ArgumentNullException(nameof(app));
// }
CheckRequirement(app);
// CheckRequirement(app);
var provider = app.ApplicationServices;
// var provider = app.ApplicationServices;
if (provider.GetService<DashboardOptions>() != null)
{
if (provider.GetService<DiscoveryOptions>() != null)
{
app.UseMiddleware<GatewayProxyMiddleware>();
}
// if (provider.GetService<DashboardOptions>() != null)
// {
// if (provider.GetService<DiscoveryOptions>() != null)
// {
// app.UseMiddleware<GatewayProxyMiddleware>();
// }
app.UseMiddleware<DashboardMiddleware>();
}
// app.UseMiddleware<DashboardMiddleware>();
// }
return app;
}
// return app;
//}
private static void CheckRequirement(IApplicationBuilder app)
{
......@@ -69,16 +67,16 @@ namespace Microsoft.AspNetCore.Builder
}
}
sealed class CapStartupFilter : IStartupFilter
{
public Action<IApplicationBuilder> Configure(Action<IApplicationBuilder> next)
{
return app =>
{
app.UseCapDashboard();
//sealed class CapStartupFilter : IStartupFilter
//{
// public Action<IApplicationBuilder> Configure(Action<IApplicationBuilder> next)
// {
// return app =>
// {
// app.UseCapDashboard();
next(app);
};
}
}
// next(app);
// };
// }
//}
}
\ No newline at end of file
......@@ -4,7 +4,8 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
// ReSharper disable InconsistentNaming
namespace DotNetCore.CAP
......@@ -52,7 +53,7 @@ namespace DotNetCore.CAP
/// <summary>
/// We’ll invoke this call-back with message type,name,content when retry failed (send or executed) messages equals <see cref="FailedRetryCount"/> times.
/// </summary>
public Action<MessageType, string, string> FailedThresholdCallback { get; set; }
public Action<MessageType, Message> FailedThresholdCallback { get; set; }
/// <summary>
/// The number of message retries, the retry will stop when the threshold is reached.
......
......@@ -6,7 +6,6 @@ using DotNetCore.CAP;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Processor.States;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection.Extensions;
......@@ -44,7 +43,7 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton<IConsumerServiceSelector, DefaultConsumerServiceSelector>();
services.TryAddSingleton<IModelBinderFactory, ModelBinderFactory>();
services.TryAddSingleton<ICallbackMessageSender, CallbackMessageSender>();
//services.TryAddSingleton<ICallbackMessageSender, CallbackMessageSender>();
services.TryAddSingleton<IConsumerInvokerFactory, ConsumerInvokerFactory>();
services.TryAddSingleton<MethodMatcherCache>();
......@@ -53,13 +52,13 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, CapProcessingServer>());
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, ConsumerRegister>());
services.TryAddSingleton<IStateChanger, StateChanger>();
//Queue's message processor
services.TryAddSingleton<MessageNeedToRetryProcessor>();
services.TryAddSingleton<TransportCheckProcessor>();
//Sender and Executors
services.AddSingleton<IMessageSender, MessageSender>();
services.TryAddSingleton<IDispatcher, Dispatcher>();
// Warning: IPublishMessageSender need to inject at extension project.
services.TryAddSingleton<ISubscriberExecutor, DefaultSubscriberExecutor>();
......@@ -74,7 +73,7 @@ namespace Microsoft.Extensions.DependencyInjection
services.Configure(setupAction);
//Startup and Hosted
services.AddTransient<IStartupFilter, CapStartupFilter>();
//services.AddTransient<IStartupFilter, CapStartupFilter>();
services.AddHostedService<DefaultBootstrapper>();
return new CapBuilder(services);
......
......@@ -4,8 +4,7 @@
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.Diagnostics
{
......@@ -18,28 +17,28 @@ namespace DotNetCore.CAP.Diagnostics
private const string CapPrefix = "DotNetCore.CAP.";
public const string CapBeforePublishMessageStore = CapPrefix + nameof(WritePublishMessageStoreBefore);
public const string CapAfterPublishMessageStore = CapPrefix + nameof(WritePublishMessageStoreAfter);
public const string CapErrorPublishMessageStore = CapPrefix + nameof(WritePublishMessageStoreError);
public const string CapBeforePublishMessageStore = CapPrefix + "WritePublishMessageStoreBefore";
public const string CapAfterPublishMessageStore = CapPrefix + "WritePublishMessageStoreAfter";
public const string CapErrorPublishMessageStore = CapPrefix + "WritePublishMessageStoreError";
public const string CapBeforePublish = CapPrefix + nameof(WritePublishBefore);
public const string CapAfterPublish = CapPrefix + nameof(WritePublishAfter);
public const string CapErrorPublish = CapPrefix + nameof(WritePublishError);
public const string CapBeforePublish = CapPrefix + "WritePublishBefore";
public const string CapAfterPublish = CapPrefix + "WritePublishAfter";
public const string CapErrorPublish = CapPrefix + "WritePublishError";
public const string CapBeforeConsume = CapPrefix + nameof(WriteConsumeBefore);
public const string CapAfterConsume = CapPrefix + nameof(WriteConsumeAfter);
public const string CapErrorConsume = CapPrefix + nameof(WriteConsumeError);
public const string CapBeforeConsume = CapPrefix + "WriteConsumeBefore";
public const string CapAfterConsume = CapPrefix + "WriteConsumeAfter";
public const string CapErrorConsume = CapPrefix + "WriteConsumeError";
public const string CapBeforeSubscriberInvoke = CapPrefix + nameof(WriteSubscriberInvokeBefore);
public const string CapAfterSubscriberInvoke = CapPrefix + nameof(WriteSubscriberInvokeAfter);
public const string CapErrorSubscriberInvoke = CapPrefix + nameof(WriteSubscriberInvokeError);
public const string CapBeforeSubscriberInvoke = CapPrefix + "WriteSubscriberInvokeBefore";
public const string CapAfterSubscriberInvoke = CapPrefix + "WriteSubscriberInvokeAfter";
public const string CapErrorSubscriberInvoke = CapPrefix + "WriteSubscriberInvokeError";
//============================================================================
//==================== Before publish store message ====================
//============================================================================
public static Guid WritePublishMessageStoreBefore(this DiagnosticListener @this,
CapPublishedMessage message,
Message message,
[CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapBeforePublishMessageStore))
......@@ -50,8 +49,7 @@ namespace DotNetCore.CAP.Diagnostics
{
OperationId = operationId,
Operation = operation,
MessageName = message.Name,
MessageContent = message.Content
Message = message
});
return operationId;
......@@ -62,7 +60,7 @@ namespace DotNetCore.CAP.Diagnostics
public static void WritePublishMessageStoreAfter(this DiagnosticListener @this,
Guid operationId,
CapPublishedMessage message,
Message message,
[CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapAfterPublishMessageStore))
......@@ -71,9 +69,7 @@ namespace DotNetCore.CAP.Diagnostics
{
OperationId = operationId,
Operation = operation,
MessageId = message.Id,
MessageName = message.Name,
MessageContent = message.Content,
Message = message,
Timestamp = Stopwatch.GetTimestamp()
});
}
......@@ -81,7 +77,7 @@ namespace DotNetCore.CAP.Diagnostics
public static void WritePublishMessageStoreError(this DiagnosticListener @this,
Guid operationId,
CapPublishedMessage message,
Message message,
Exception ex,
[CallerMemberName] string operation = "")
{
......@@ -91,8 +87,7 @@ namespace DotNetCore.CAP.Diagnostics
{
OperationId = operationId,
Operation = operation,
MessageName = message.Name,
MessageContent = message.Content,
Message = message,
Exception = ex,
Timestamp = Stopwatch.GetTimestamp()
});
......@@ -100,135 +95,135 @@ namespace DotNetCore.CAP.Diagnostics
}
//============================================================================
//==================== Publish ====================
//============================================================================
public static void WritePublishBefore(this DiagnosticListener @this, BrokerPublishEventData eventData)
{
if (@this.IsEnabled(CapBeforePublish))
{
eventData.Headers = new TracingHeaders();
@this.Write(CapBeforePublish, eventData);
}
}
////============================================================================
////==================== Publish ====================
////============================================================================
//public static void WritePublishBefore(this DiagnosticListener @this, BrokerPublishEventData eventData)
//{
// if (@this.IsEnabled(CapBeforePublish))
// {
// eventData.Headers = new TracingHeaders();
// @this.Write(CapBeforePublish, eventData);
// }
//}
public static void WritePublishAfter(this DiagnosticListener @this, BrokerPublishEndEventData eventData)
{
if (@this.IsEnabled(CapAfterPublish))
{
eventData.Headers = new TracingHeaders();
@this.Write(CapAfterPublish, eventData);
}
}
//public static void WritePublishAfter(this DiagnosticListener @this, BrokerPublishEndEventData eventData)
//{
// if (@this.IsEnabled(CapAfterPublish))
// {
// eventData.Headers = new TracingHeaders();
// @this.Write(CapAfterPublish, eventData);
// }
//}
public static void WritePublishError(this DiagnosticListener @this, BrokerPublishErrorEventData eventData)
{
if (@this.IsEnabled(CapErrorPublish))
{
eventData.Headers = new TracingHeaders();
@this.Write(CapErrorPublish, eventData);
}
}
//public static void WritePublishError(this DiagnosticListener @this, BrokerPublishErrorEventData eventData)
//{
// if (@this.IsEnabled(CapErrorPublish))
// {
// eventData.Headers = new TracingHeaders();
// @this.Write(CapErrorPublish, eventData);
// }
//}
//============================================================================
//==================== Consume ====================
//============================================================================
public static Guid WriteConsumeBefore(this DiagnosticListener @this, BrokerConsumeEventData eventData)
{
if (@this.IsEnabled(CapBeforeConsume))
{
eventData.Headers = new TracingHeaders();
@this.Write(CapBeforeConsume, eventData);
}
return Guid.Empty;
}
public static void WriteConsumeAfter(this DiagnosticListener @this, BrokerConsumeEndEventData eventData)
{
if (@this.IsEnabled(CapAfterConsume))
{
eventData.Headers = new TracingHeaders();
@this.Write(CapAfterConsume, eventData);
}
}
public static void WriteConsumeError(this DiagnosticListener @this, BrokerConsumeErrorEventData eventData)
{
if (@this.IsEnabled(CapErrorConsume))
{
eventData.Headers = new TracingHeaders();
@this.Write(CapErrorConsume, eventData);
}
}
//public static Guid WriteConsumeBefore(this DiagnosticListener @this, BrokerConsumeEventData eventData)
//{
// if (@this.IsEnabled(CapBeforeConsume))
// {
// eventData.Headers = new TracingHeaders();
// @this.Write(CapBeforeConsume, eventData);
// }
// return Guid.Empty;
//}
//public static void WriteConsumeAfter(this DiagnosticListener @this, BrokerConsumeEndEventData eventData)
//{
// if (@this.IsEnabled(CapAfterConsume))
// {
// eventData.Headers = new TracingHeaders();
// @this.Write(CapAfterConsume, eventData);
// }
//}
//public static void WriteConsumeError(this DiagnosticListener @this, BrokerConsumeErrorEventData eventData)
//{
// if (@this.IsEnabled(CapErrorConsume))
// {
// eventData.Headers = new TracingHeaders();
// @this.Write(CapErrorConsume, eventData);
// }
//}
//============================================================================
//==================== SubscriberInvoke ====================
//============================================================================
public static Guid WriteSubscriberInvokeBefore(this DiagnosticListener @this,
ConsumerContext context,
[CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapBeforeSubscriberInvoke))
{
var operationId = Guid.NewGuid();
var methodName = context.ConsumerDescriptor.MethodInfo.Name;
var subscribeName = context.ConsumerDescriptor.Attribute.Name;
var subscribeGroup = context.ConsumerDescriptor.Attribute.Group;
var parameterValues = context.DeliverMessage.Content;
@this.Write(CapBeforeSubscriberInvoke, new SubscriberInvokeEventData(operationId, operation, methodName,
subscribeName,
subscribeGroup, parameterValues, DateTimeOffset.UtcNow));
return operationId;
}
return Guid.Empty;
}
public static void WriteSubscriberInvokeAfter(this DiagnosticListener @this,
Guid operationId,
ConsumerContext context,
DateTimeOffset startTime,
TimeSpan duration,
[CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapAfterSubscriberInvoke))
{
var methodName = context.ConsumerDescriptor.MethodInfo.Name;
var subscribeName = context.ConsumerDescriptor.Attribute.Name;
var subscribeGroup = context.ConsumerDescriptor.Attribute.Group;
var parameterValues = context.DeliverMessage.Content;
@this.Write(CapAfterSubscriberInvoke, new SubscriberInvokeEndEventData(operationId, operation, methodName,
subscribeName,
subscribeGroup, parameterValues, startTime, duration));
}
}
public static void WriteSubscriberInvokeError(this DiagnosticListener @this,
Guid operationId,
ConsumerContext context,
Exception ex,
DateTimeOffset startTime,
TimeSpan duration,
[CallerMemberName] string operation = "")
{
if (@this.IsEnabled(CapErrorSubscriberInvoke))
{
var methodName = context.ConsumerDescriptor.MethodInfo.Name;
var subscribeName = context.ConsumerDescriptor.Attribute.Name;
var subscribeGroup = context.ConsumerDescriptor.Attribute.Group;
var parameterValues = context.DeliverMessage.Content;
@this.Write(CapErrorSubscriberInvoke, new SubscriberInvokeErrorEventData(operationId, operation, methodName,
subscribeName,
subscribeGroup, parameterValues, ex, startTime, duration));
}
}
//public static Guid WriteSubscriberInvokeBefore(this DiagnosticListener @this,
// ConsumerContext context,
// [CallerMemberName] string operation = "")
//{
// if (@this.IsEnabled(CapBeforeSubscriberInvoke))
// {
// var operationId = Guid.NewGuid();
// var methodName = context.ConsumerDescriptor.MethodInfo.Name;
// var subscribeName = context.ConsumerDescriptor.Attribute.Name;
// var subscribeGroup = context.ConsumerDescriptor.Attribute.Group;
// var values = context.DeliverMessage.Value;
// @this.Write(CapBeforeSubscriberInvoke, new SubscriberInvokeEventData(operationId, operation, methodName,
// subscribeName,
// subscribeGroup, parameterValues, DateTimeOffset.UtcNow));
// return operationId;
// }
// return Guid.Empty;
//}
//public static void WriteSubscriberInvokeAfter(this DiagnosticListener @this,
// Guid operationId,
// ConsumerContext context,
// DateTimeOffset startTime,
// TimeSpan duration,
// [CallerMemberName] string operation = "")
//{
// if (@this.IsEnabled(CapAfterSubscriberInvoke))
// {
// var methodName = context.ConsumerDescriptor.MethodInfo.Name;
// var subscribeName = context.ConsumerDescriptor.Attribute.Name;
// var subscribeGroup = context.ConsumerDescriptor.Attribute.Group;
// var values = context.DeliverMessage.Value;
// @this.Write(CapAfterSubscriberInvoke, new SubscriberInvokeEndEventData(operationId, operation, methodName,
// subscribeName,
// subscribeGroup, parameterValues, startTime, duration));
// }
//}
//public static void WriteSubscriberInvokeError(this DiagnosticListener @this,
// Guid operationId,
// ConsumerContext context,
// Exception ex,
// DateTimeOffset startTime,
// TimeSpan duration,
// [CallerMemberName] string operation = "")
//{
// if (@this.IsEnabled(CapErrorSubscriberInvoke))
// {
// var methodName = context.ConsumerDescriptor.MethodInfo.Name;
// var subscribeName = context.ConsumerDescriptor.Attribute.Name;
// var subscribeGroup = context.ConsumerDescriptor.Attribute.Group;
// var parameterValues = context.DeliverMessage.Content;
// @this.Write(CapErrorSubscriberInvoke, new SubscriberInvokeErrorEventData(operationId, operation, methodName,
// subscribeName,
// subscribeGroup, parameterValues, ex, startTime, duration));
// }
//}
}
}
\ No newline at end of file
......@@ -2,18 +2,26 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.Diagnostics
{
public class BrokerConsumeEventData : BrokerEventData
public class BrokerConsumeEventData
{
public BrokerConsumeEventData(Guid operationId, string operation, string brokerAddress,
string brokerTopicName, string brokerTopicBody, DateTimeOffset startTime)
: base(operationId, operation, brokerAddress, brokerTopicName, brokerTopicBody)
public BrokerConsumeEventData(Guid operationId,string brokerAddress, TransportMessage message, DateTimeOffset startTime)
{
OperationId = operationId;
StartTime = startTime;
BrokerAddress = brokerAddress;
Message = message;
}
public Guid OperationId { get; set; }
public string BrokerAddress { get; set; }
public TransportMessage Message { get; set; }
public DateTimeOffset StartTime { get; }
}
}
\ No newline at end of file
......@@ -2,15 +2,14 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.Diagnostics
{
public class BrokerConsumeEndEventData : BrokerConsumeEventData
{
public BrokerConsumeEndEventData(Guid operationId, string operation, string brokerAddress,
string brokerTopicName,
string brokerTopicBody, DateTimeOffset startTime, TimeSpan duration)
: base(operationId, operation, brokerAddress, brokerTopicName, brokerTopicBody, startTime)
public BrokerConsumeEndEventData(Guid operationId, string operation, string brokerAddress, TransportMessage message, DateTimeOffset startTime, TimeSpan duration)
: base(operationId, brokerAddress, message, startTime)
{
Duration = duration;
}
......
......@@ -2,19 +2,26 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.Diagnostics
{
public class BrokerConsumeErrorEventData : BrokerConsumeEndEventData, IErrorEventData
public class BrokerConsumeErrorEventData : IErrorEventData
{
public BrokerConsumeErrorEventData(Guid operationId, string operation, string brokerAddress,
string brokerTopicName, string brokerTopicBody, Exception exception, DateTimeOffset startTime,
TimeSpan duration)
: base(operationId, operation, brokerAddress, brokerTopicName, brokerTopicBody, startTime, duration)
public BrokerConsumeErrorEventData(Guid operationId, string brokerAddress, TransportMessage message, Exception exception)
{
OperationId = operationId;
BrokerAddress = brokerAddress;
Message = message;
Exception = exception;
}
public Guid OperationId { get; set; }
public string BrokerAddress { get; }
public TransportMessage Message { get; }
public Exception Exception { get; }
}
}
\ No newline at end of file
......@@ -2,14 +2,15 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.Diagnostics
{
public class BrokerPublishEventData : BrokerEventData
{
public BrokerPublishEventData(Guid operationId, string operation, string brokerAddress,
string brokerTopicName, string brokerTopicBody, DateTimeOffset startTime)
: base(operationId, operation, brokerAddress, brokerTopicName, brokerTopicBody)
Message message , DateTimeOffset startTime)
: base(operationId, operation, brokerAddress, message)
{
StartTime = startTime;
}
......
......@@ -2,15 +2,15 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.Diagnostics
{
public class BrokerPublishEndEventData : BrokerPublishEventData
{
public BrokerPublishEndEventData(Guid operationId, string operation, string brokerAddress,
string brokerTopicName,
string brokerTopicBody, DateTimeOffset startTime, TimeSpan duration)
: base(operationId, operation, brokerAddress, brokerTopicName, brokerTopicBody, startTime)
Message message, DateTimeOffset startTime, TimeSpan duration)
: base(operationId, operation, brokerAddress, message, startTime)
{
Duration = duration;
}
......
......@@ -2,15 +2,15 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.Diagnostics
{
public class BrokerPublishErrorEventData : BrokerPublishEndEventData, IErrorEventData
{
public BrokerPublishErrorEventData(Guid operationId, string operation, string brokerAddress,
string brokerTopicName, string brokerTopicBody, Exception exception, DateTimeOffset startTime,
TimeSpan duration)
: base(operationId, operation, brokerAddress, brokerTopicName, brokerTopicBody, startTime, duration)
Message message, Exception exception, DateTimeOffset startTime, TimeSpan duration)
: base(operationId, operation, brokerAddress, message, startTime, duration)
{
Exception = exception;
}
......
......@@ -2,26 +2,22 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.Diagnostics
{
public class BrokerEventData : EventData
{
public BrokerEventData(Guid operationId, string operation, string brokerAddress,
string brokerTopicName, string brokerTopicBody)
public BrokerEventData(Guid operationId, string operation, string brokerAddress, Message message)
: base(operationId, operation)
{
BrokerAddress = brokerAddress;
BrokerTopicName = brokerTopicName;
BrokerTopicBody = brokerTopicBody;
}
public TracingHeaders Headers { get; set; }
Message = message;
}
public string BrokerAddress { get; set; }
public string BrokerTopicBody { get; set; }
public string BrokerTopicName { get; set; }
public Message Message { get; set; }
}
}
\ No newline at end of file
......@@ -12,14 +12,13 @@ namespace DotNetCore.CAP.Diagnostics
string methodName,
string subscribeName,
string subscribeGroup,
string parameterValues,
object values,
DateTimeOffset startTime)
: base(operationId, operation)
{
MethodName = methodName;
SubscribeName = subscribeName;
SubscribeGroup = subscribeGroup;
ParameterValues = parameterValues;
StartTime = startTime;
}
......@@ -31,6 +30,6 @@ namespace DotNetCore.CAP.Diagnostics
public string SubscribeGroup { get; set; }
public string ParameterValues { get; set; }
public string Values { get; set; }
}
}
\ No newline at end of file
......@@ -9,27 +9,6 @@
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>
<ItemGroup>
<EmbeddedResource Include="Dashboard\Content\css\bootstrap.min.css" />
<EmbeddedResource Include="Dashboard\Content\css\cap.css" />
<EmbeddedResource Include="Dashboard\Content\css\jsonview.min.css" />
<EmbeddedResource Include="Dashboard\Content\css\rickshaw.min.css" />
<EmbeddedResource Include="Dashboard\Content\fonts\glyphicons-halflings-regular.eot" />
<EmbeddedResource Include="Dashboard\Content\fonts\glyphicons-halflings-regular.svg" />
<EmbeddedResource Include="Dashboard\Content\fonts\glyphicons-halflings-regular.ttf" />
<EmbeddedResource Include="Dashboard\Content\fonts\glyphicons-halflings-regular.woff" />
<EmbeddedResource Include="Dashboard\Content\fonts\glyphicons-halflings-regular.woff2" />
<EmbeddedResource Include="Dashboard\Content\js\bootstrap.min.js" />
<EmbeddedResource Include="Dashboard\Content\js\d3.layout.min.js" />
<EmbeddedResource Include="Dashboard\Content\js\d3.min.js" />
<EmbeddedResource Include="Dashboard\Content\js\cap.js" />
<EmbeddedResource Include="Dashboard\Content\js\jquery-2.1.4.min.js" />
<EmbeddedResource Include="Dashboard\Content\js\jsonview.min.js" />
<EmbeddedResource Include="Dashboard\Content\js\moment-with-locales.min.js" />
<EmbeddedResource Include="Dashboard\Content\js\moment.min.js" />
<EmbeddedResource Include="Dashboard\Content\js\rickshaw.min.js" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Consul" Version="0.7.2.6" />
<PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="2.2.0" />
......@@ -37,88 +16,7 @@
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.5.1" />
</ItemGroup>
<ItemGroup>
<Compile Update="Dashboard\Content\resx\Strings.Designer.cs">
<DesignTime>True</DesignTime>
<AutoGen>True</AutoGen>
<DependentUpon>Strings.resx</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\_SidebarMenu.generated.cs">
<DependentUpon>_SidebarMenu.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\SidebarMenu.cs">
<DependentUpon>_SidebarMenu.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\ReceivedPage.generated.cs">
<DependentUpon>ReceivedPage.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\ReceivedPage.cs">
<DependentUpon>ReceivedPage.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\_BlockMetric.generated.cs">
<DependentUpon>_BlockMetric.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\BlockMetric.cs">
<DependentUpon>_BlockMetric.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\Breadcrumbs.cs">
<DependentUpon>_Breadcrumbs.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\_Breadcrumbs.generated.cs">
<DependentUpon>_Breadcrumbs.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\_Paginator.generated.cs">
<DependentUpon>_Paginator.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\_Paginator.cs">
<DependentUpon>_Paginator.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\_PerPageSelector.cs">
<DependentUpon>_PerPageSelector.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\_PerPageSelector.generated.cs">
<DependentUpon>_PerPageSelector.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\PublishedPage.cs">
<DependentUpon>PublishedPage.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\PublishedPage*.cs">
<DependentUpon>PublishedPage.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\LayoutPage.*.cs">
<DependentUpon>LayoutPage.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\LayoutPage.cs">
<DependentUpon>LayoutPage.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\InlineMetric.cs">
<DependentUpon>_InlineMetric.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\_InlineMetric.generated.cs">
<DependentUpon>_InlineMetric.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\_Navigation.generated.cs">
<DependentUpon>_Navigation.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\HomePage.generated.cs">
<DependentUpon>HomePage.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\HomePage.cs">
<DependentUpon>HomePage.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\SubscriberPage.generated.cs">
<DependentUpon>SubscriberPage.cshtml</DependentUpon>
</Compile>
<Compile Update="Dashboard\Pages\NodePage*.cs">
<DependentUpon>NodePage.cshtml</DependentUpon>
</Compile>
</ItemGroup>
<ItemGroup>
<EmbeddedResource Update="Dashboard\Content\resx\Strings.resx">
<Generator>PublicResXFileCodeGenerator</Generator>
<CustomToolNamespace>DotNetCore.CAP.Dashboard.Resources</CustomToolNamespace>
<LastGenOutput>Strings.Designer.cs</LastGenOutput>
</EmbeddedResource>
<Folder Include="Persistence\InMemory\" />
</ItemGroup>
</Project>
\ No newline at end of file
......@@ -5,6 +5,7 @@ using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Persistence;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
......@@ -19,7 +20,7 @@ namespace DotNetCore.CAP
public DefaultBootstrapper(
ILogger<DefaultBootstrapper> logger,
IStorage storage,
IStorageInitializer storage,
IEnumerable<IProcessingServer> processors)
{
_logger = logger;
......@@ -27,7 +28,7 @@ namespace DotNetCore.CAP
Processors = processors;
}
private IStorage Storage { get; }
private IStorageInitializer Storage { get; }
private IEnumerable<IProcessingServer> Processors { get; }
......
......@@ -2,7 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP
{
......
......@@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
......@@ -26,7 +27,10 @@ namespace DotNetCore.CAP
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="callbackName">callback subscriber name</param>
/// <param name="cancellationToken"></param>
Task PublishAsync<T>(string name, T contentObj, string callbackName = null, CancellationToken cancellationToken = default(CancellationToken));
Task PublishAsync<T>(string name, T contentObj, string callbackName = null, CancellationToken cancellationToken = default);
Task PublishAsync<T>(string name, T contentObj, IDictionary<string, string> optionHeaders, CancellationToken cancellationToken = default);
/// <summary>
/// Publish an object message.
......
using System.Collections.Concurrent;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Persistence;
namespace DotNetCore.CAP
{
......@@ -7,19 +7,19 @@ namespace DotNetCore.CAP
{
private readonly IDispatcher _dispatcher;
private readonly ConcurrentQueue<CapPublishedMessage> _bufferList;
private readonly ConcurrentQueue<MediumMessage> _bufferList;
protected CapTransactionBase(IDispatcher dispatcher)
{
_dispatcher = dispatcher;
_bufferList = new ConcurrentQueue<CapPublishedMessage>();
_bufferList = new ConcurrentQueue<MediumMessage>();
}
public bool AutoCommit { get; set; }
public object DbTransaction { get; set; }
protected internal virtual void AddToSent(CapPublishedMessage msg)
protected internal virtual void AddToSent(MediumMessage msg)
{
_bufferList.Enqueue(msg);
}
......@@ -29,6 +29,7 @@ namespace DotNetCore.CAP
while (!_bufferList.IsEmpty)
{
_bufferList.TryDequeue(out var message);
_dispatcher.EnqueueToPublish(message);
}
}
......
......@@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Threading;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP
{
......@@ -36,7 +37,7 @@ namespace DotNetCore.CAP
/// </summary>
void Reject();
event EventHandler<MessageContext> OnMessageReceived;
event EventHandler<TransportMessage> OnMessageReceived;
event EventHandler<LogMessageEventArgs> OnLog;
}
......
......@@ -7,9 +7,10 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Serialization;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
......@@ -17,9 +18,10 @@ namespace DotNetCore.CAP
{
internal class ConsumerRegister : IConsumerRegister
{
private readonly IStorageConnection _connection;
private readonly IConsumerClientFactory _consumerClientFactory;
private readonly IDispatcher _dispatcher;
private readonly ISerializer _serializer;
private readonly IDataStorage _storage;
private readonly ILogger _logger;
private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1);
private readonly CapOptions _options;
......@@ -40,7 +42,8 @@ namespace DotNetCore.CAP
IOptions<CapOptions> options,
IConsumerClientFactory consumerClientFactory,
IDispatcher dispatcher,
IStorageConnection connection,
ISerializer serializer,
IDataStorage storage,
ILogger<ConsumerRegister> logger,
MethodMatcherCache selector)
{
......@@ -49,7 +52,8 @@ namespace DotNetCore.CAP
_logger = logger;
_consumerClientFactory = consumerClientFactory;
_dispatcher = dispatcher;
_connection = connection;
_serializer = serializer;
_storage = storage;
_cts = new CancellationTokenSource();
}
......@@ -144,34 +148,28 @@ namespace DotNetCore.CAP
private void RegisterMessageProcessor(IConsumerClient client)
{
client.OnMessageReceived += (sender, messageContext) =>
client.OnMessageReceived += async (sender, messageContext) =>
{
_cts.Token.ThrowIfCancellationRequested();
Guid? operationId = null;
try
{
operationId = TracingBefore(messageContext);
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
var tracingResult = TracingBefore(messageContext.Name, messageContext.Content);
var operationId = tracingResult.Item1;
var messageBody = tracingResult.Item2;
var receivedMessage = new CapReceivedMessage(messageContext)
{
Id = SnowflakeId.Default().NextId(),
StatusName = StatusName.Scheduled,
Content = messageBody
};
try
{
StoreMessage(receivedMessage);
var message = await _serializer.DeserializeAsync(messageContext);
var mediumMessage = await _storage.StoreMessageAsync(message.GetName(), message.GetGroup(), message);
client.Commit();
TracingAfter(operationId, receivedMessage.Name, receivedMessage.Content, startTime,
stopwatch.Elapsed);
if (operationId != null)
{
TracingAfter(operationId.Value, message, startTime, stopwatch.Elapsed);
}
_dispatcher.EnqueueToExecute(receivedMessage);
_dispatcher.EnqueueToExecute(mediumMessage);
}
catch (Exception e)
{
......@@ -179,8 +177,10 @@ namespace DotNetCore.CAP
client.Reject();
TracingError(operationId, receivedMessage.Name, receivedMessage.Content, e, startTime,
stopwatch.Elapsed);
if (operationId != null)
{
TracingError(operationId.Value, messageContext, e);
}
}
};
......@@ -217,56 +217,39 @@ namespace DotNetCore.CAP
}
}
private void StoreMessage(CapReceivedMessage receivedMessage)
private Guid? TracingBefore(TransportMessage message)
{
_connection.StoreReceivedMessage(receivedMessage);
}
private (Guid, string) TracingBefore(string topic, string values)
if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerExtensions.CapBeforeConsume))
{
_logger.LogDebug("CAP received topic message:" + topic);
var operationId = Guid.NewGuid();
Guid operationId = Guid.NewGuid();
var eventData = new BrokerConsumeEventData(operationId, _serverAddress, message, DateTimeOffset.UtcNow);
var eventData = new BrokerConsumeEventData(
operationId, "",
_serverAddress,
topic,
values,
DateTimeOffset.UtcNow);
s_diagnosticListener.Write(CapDiagnosticListenerExtensions.CapBeforeConsume, eventData);
s_diagnosticListener.WriteConsumeBefore(eventData);
return operationId;
}
return (operationId, eventData.BrokerTopicBody);
return null;
}
private void TracingAfter(Guid operationId, string topic, string values, DateTimeOffset startTime, TimeSpan du)
private void TracingAfter(Guid operationId, Message message, DateTimeOffset startTime, TimeSpan du)
{
var eventData = new BrokerConsumeEndEventData(
operationId,
"",
_serverAddress,
topic,
values,
startTime,
du);
//if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerExtensions.CapAfterConsume))
//{
// var eventData = new BrokerConsumeEndEventData(operationId, "", _serverAddress, message, startTime, du);
s_diagnosticListener.WriteConsumeAfter(eventData);
// s_diagnosticListener.Write(CapDiagnosticListenerExtensions.CapAfterConsume, eventData);
//}
}
private void TracingError(Guid operationId, string topic, string values, Exception ex, DateTimeOffset startTime, TimeSpan du)
private void TracingError(Guid operationId, TransportMessage message, Exception ex)
{
var eventData = new BrokerConsumeErrorEventData(
operationId,
"",
_serverAddress,
topic,
values,
ex,
startTime,
du);
s_diagnosticListener.WriteConsumeError(eventData);
if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerExtensions.CapErrorConsume))
{
var eventData = new BrokerConsumeErrorEventData(operationId, _serverAddress, message, ex);
s_diagnosticListener.Write(CapDiagnosticListenerExtensions.CapErrorConsume, eventData);
}
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Persistence;
namespace DotNetCore.CAP
{
public interface IDispatcher
{
void EnqueueToPublish(CapPublishedMessage message);
void EnqueueToPublish(MediumMessage message);
void EnqueueToExecute(CapReceivedMessage message);
void EnqueueToExecute(MediumMessage message);
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Threading.Tasks;
namespace DotNetCore.CAP
{
/// <summary>
/// publish message excutor. The excutor sends the message to the message queue
/// </summary>
public interface IPublishExecutor
{
/// <summary>
/// publish message to message queue.
/// </summary>
/// <param name="keyName">The message topic name.</param>
/// <param name="content">The message content.</param>
/// <returns></returns>
Task<OperateResult> PublishAsync(string keyName, string content);
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP
{
public interface IPublishMessageSender
{
Task<OperateResult> SendAsync(CapPublishedMessage message);
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Dashboard;
namespace DotNetCore.CAP
{
/// <summary>
/// Represents a persisted storage.
/// </summary>
public interface IStorage
{
/// <summary>
/// Initializes the storage. For example, making sure a database is created and migrations are applied.
/// </summary>
Task InitializeAsync(CancellationToken cancellationToken);
/// <summary>
/// Provider the dashboard metric api.
/// </summary>
IMonitoringApi GetMonitoringApi();
/// <summary>
/// Storage connection of database operate.
/// </summary>
IStorageConnection GetConnection();
}
}
\ No newline at end of file
......@@ -3,7 +3,7 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP
{
......@@ -44,10 +44,6 @@ namespace DotNetCore.CAP
/// </summary>
Task<IEnumerable<CapReceivedMessage>> GetReceivedMessagesOfNeedRetry();
/// <summary>
/// Creates and returns an <see cref="IStorageTransaction" />.
/// </summary>
IStorageTransaction CreateTransaction();
/// <summary>
/// Change specified message's state of published message
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP
{
/// <summary>
/// A transactional database storage operation.
/// Update message state of the message table with transactional.
/// </summary>
public interface IStorageTransaction : IDisposable
{
void UpdateMessage(CapPublishedMessage message);
void UpdateMessage(CapReceivedMessage message);
Task CommitAsync();
}
}
\ No newline at end of file
......@@ -2,15 +2,16 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
......@@ -18,10 +19,9 @@ namespace DotNetCore.CAP
{
internal class DefaultSubscriberExecutor : ISubscriberExecutor
{
private readonly ICallbackMessageSender _callbackMessageSender;
private readonly IStorageConnection _connection;
private readonly ICapPublisher _sender;
private readonly IDataStorage _dataStorage;
private readonly ILogger _logger;
private readonly IStateChanger _stateChanger;
private readonly CapOptions _options;
private readonly MethodMatcherCache _selector;
......@@ -34,16 +34,14 @@ namespace DotNetCore.CAP
ILogger<DefaultSubscriberExecutor> logger,
IOptions<CapOptions> options,
IConsumerInvokerFactory consumerInvokerFactory,
ICallbackMessageSender callbackMessageSender,
IStateChanger stateChanger,
IStorageConnection connection,
ICapPublisher sender,
IDataStorage dataStorage,
MethodMatcherCache selector)
{
_selector = selector;
_callbackMessageSender = callbackMessageSender;
_sender = sender;
_options = options.Value;
_stateChanger = stateChanger;
_connection = connection;
_dataStorage = dataStorage;
_logger = logger;
Invoker = consumerInvokerFactory.CreateInvoker();
......@@ -51,7 +49,7 @@ namespace DotNetCore.CAP
private IConsumerInvoker Invoker { get; }
public async Task<OperateResult> ExecuteAsync(CapReceivedMessage message, CancellationToken cancellationToken)
public async Task<OperateResult> ExecuteAsync(MediumMessage message, CancellationToken cancellationToken)
{
bool retry;
OperateResult result;
......@@ -69,13 +67,7 @@ namespace DotNetCore.CAP
return result;
}
/// <summary>
/// Execute message consumption once.
/// </summary>
/// <param name="message">the message received of <see cref="CapReceivedMessage"/></param>
/// <param name="cancellationToken"></param>
/// <returns>Item1 is need still retry, Item2 is executed result.</returns>
private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(CapReceivedMessage message, CancellationToken cancellationToken)
private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(MediumMessage message, CancellationToken cancellationToken)
{
if (message == null)
{
......@@ -100,35 +92,36 @@ namespace DotNetCore.CAP
}
catch (Exception ex)
{
_logger.LogError(ex, $"An exception occurred while executing the subscription method. Topic:{message.Name}, Id:{message.Id}");
_logger.LogError(ex, $"An exception occurred while executing the subscription method. Topic:{message.Origin.GetName()}, Id:{message.DbId}");
return (await SetFailedState(message, ex), OperateResult.Failed(ex));
}
}
private Task SetSuccessfulState(CapReceivedMessage message)
private Task SetSuccessfulState(MediumMessage message)
{
var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter);
return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
message.ExpiresAt = DateTime.Now.AddSeconds(_options.SucceedMessageExpiredAfter);
return _dataStorage.ChangeReceiveStateAsync(message, StatusName.Succeeded);
}
private async Task<bool> SetFailedState(CapReceivedMessage message, Exception ex)
private async Task<bool> SetFailedState(MediumMessage message, Exception ex)
{
if (ex is SubscriberNotFoundException)
{
message.Retries = _options.FailedRetryCount; // not retry if SubscriberNotFoundException
}
AddErrorReasonToContent(message, ex);
//TODO: Add exception to content
// AddErrorReasonToContent(message, ex);
var needRetry = UpdateMessageForRetry(message);
await _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);
await _dataStorage.ChangeReceiveStateAsync(message, StatusName.Failed);
return needRetry;
}
private bool UpdateMessageForRetry(CapReceivedMessage message)
private bool UpdateMessageForRetry(MediumMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;
......@@ -142,9 +135,9 @@ namespace DotNetCore.CAP
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Origin);
_logger.ConsumerExecutedAfterThreshold(message.Id, _options.FailedRetryCount);
_logger.ConsumerExecutedAfterThreshold(message.DbId, _options.FailedRetryCount);
}
catch (Exception ex)
{
......@@ -154,22 +147,24 @@ namespace DotNetCore.CAP
return false;
}
_logger.ConsumerExecutionRetrying(message.Id, retries);
_logger.ConsumerExecutionRetrying(message.DbId, retries);
return true;
}
private static void AddErrorReasonToContent(CapReceivedMessage message, Exception exception)
{
message.Content = Helper.AddExceptionProperty(message.Content, exception);
}
//private static void AddErrorReasonToContent(CapReceivedMessage message, Exception exception)
//{
// message.Content = Helper.AddExceptionProperty(message.Content, exception);
//}
private async Task InvokeConsumerMethodAsync(CapReceivedMessage receivedMessage, CancellationToken cancellationToken)
private async Task InvokeConsumerMethodAsync(MediumMessage message, CancellationToken cancellationToken)
{
if (!_selector.TryGetTopicExecutor(receivedMessage.Name, receivedMessage.Group,
if (!_selector.TryGetTopicExecutor(
message.Origin.GetName(),
message.Origin.GetGroup(),
out var executor))
{
var error = $"Message can not be found subscriber. {receivedMessage} \r\n see: https://github.com/dotnetcore/CAP/issues/63";
var error = $"Message can not be found subscriber. {message} \r\n see: https://github.com/dotnetcore/CAP/issues/63";
throw new SubscriberNotFoundException(error);
}
......@@ -177,20 +172,25 @@ namespace DotNetCore.CAP
var stopwatch = Stopwatch.StartNew();
var operationId = Guid.Empty;
var consumerContext = new ConsumerContext(executor, receivedMessage.ToMessageContext());
var consumerContext = new ConsumerContext(executor, message.Origin);
try
{
operationId = s_diagnosticListener.WriteSubscriberInvokeBefore(consumerContext);
// operationId = s_diagnosticListener.WriteSubscriberInvokeBefore(consumerContext);
var ret = await Invoker.InvokeAsync(consumerContext, cancellationToken);
s_diagnosticListener.WriteSubscriberInvokeAfter(operationId, consumerContext, startTime,
stopwatch.Elapsed);
// s_diagnosticListener.WriteSubscriberInvokeAfter(operationId, consumerContext, startTime,stopwatch.Elapsed);
if (!string.IsNullOrEmpty(ret.CallbackName))
{
await _callbackMessageSender.SendAsync(ret.MessageId, ret.CallbackName, ret.Result);
var header = new Dictionary<string, string>()
{
[Headers.CorrelationId] = message.Origin.GetId(),
[Headers.CorrelationSequence] = (message.Origin.GetCorrelationSequence() + 1).ToString()
};
await _sender.PublishAsync(ret.CallbackName, ret.Result, header, cancellationToken);
}
}
catch (OperationCanceledException)
......@@ -199,7 +199,7 @@ namespace DotNetCore.CAP
}
catch (Exception ex)
{
s_diagnosticListener.WriteSubscriberInvokeError(operationId, consumerContext, ex, startTime, stopwatch.Elapsed);
// s_diagnosticListener.WriteSubscriberInvokeError(operationId, consumerContext, ex, startTime, stopwatch.Elapsed);
throw new SubscriberExecutionFailedException(ex.Message, ex);
}
......
......@@ -3,7 +3,7 @@
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Persistence;
namespace DotNetCore.CAP
{
......@@ -12,6 +12,6 @@ namespace DotNetCore.CAP
/// </summary>
public interface ISubscriberExecutor
{
Task<OperateResult> ExecuteAsync(CapReceivedMessage message, CancellationToken cancellationToken = default);
Task<OperateResult> ExecuteAsync(MediumMessage message, CancellationToken cancellationToken = default);
}
}
\ No newline at end of file
......@@ -6,22 +6,10 @@ namespace DotNetCore.CAP.Infrastructure
/// <summary>
/// The message status name.
/// </summary>
public struct StatusName
public enum StatusName
{
public const string Scheduled = nameof(Scheduled);
public const string Succeeded = nameof(Succeeded);
public const string Failed = nameof(Failed);
public static string Standardized(string input)
{
foreach (var item in typeof(StatusName).GetFields())
{
if (item.Name.ToLower() == input.ToLower())
{
return item.Name;
}
}
return string.Empty;
}
Failed = -1,
Scheduled,
Succeeded
}
}
\ No newline at end of file
......@@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.Internal
{
......@@ -15,7 +16,7 @@ namespace DotNetCore.CAP.Internal
/// </summary>
/// <param name="descriptor">consumer method descriptor. </param>
/// <param name="message"> received message.</param>
public ConsumerContext(ConsumerExecutorDescriptor descriptor, MessageContext message)
public ConsumerContext(ConsumerExecutorDescriptor descriptor, Message message)
{
ConsumerDescriptor = descriptor ?? throw new ArgumentNullException(nameof(descriptor));
DeliverMessage = message ?? throw new ArgumentNullException(nameof(message));
......@@ -29,6 +30,6 @@ namespace DotNetCore.CAP.Internal
/// <summary>
/// consumer received message.
/// </summary>
public MessageContext DeliverMessage { get; }
public Message DeliverMessage { get; }
}
}
\ No newline at end of file
......@@ -10,25 +10,25 @@ namespace DotNetCore.CAP.Internal
internal class ConsumerInvokerFactory : IConsumerInvokerFactory
{
private readonly ILoggerFactory _loggerFactory;
private readonly IMessagePacker _messagePacker;
private readonly IModelBinderFactory _modelBinderFactory;
//private readonly IMessagePacker _messagePacker;
//private readonly IModelBinderFactory _modelBinderFactory;
private readonly IServiceProvider _serviceProvider;
public ConsumerInvokerFactory(
ILoggerFactory loggerFactory,
IMessagePacker messagePacker,
IModelBinderFactory modelBinderFactory,
//IMessagePacker messagePacker,
//IModelBinderFactory modelBinderFactory,
IServiceProvider serviceProvider)
{
_loggerFactory = loggerFactory;
_messagePacker = messagePacker;
_modelBinderFactory = modelBinderFactory;
//_messagePacker = messagePacker;
//_modelBinderFactory = modelBinderFactory;
_serviceProvider = serviceProvider;
}
public IConsumerInvoker CreateInvoker()
{
return new DefaultConsumerInvoker(_loggerFactory, _serviceProvider, _messagePacker, _modelBinderFactory);
return new DefaultConsumerInvoker(_loggerFactory, _serviceProvider);
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Internal
{
internal class CallbackMessageSender : ICallbackMessageSender
{
private readonly IContentSerializer _contentSerializer;
private readonly ILogger<CallbackMessageSender> _logger;
private readonly IMessagePacker _messagePacker;
private readonly IServiceProvider _serviceProvider;
public CallbackMessageSender(
ILogger<CallbackMessageSender> logger,
IServiceProvider serviceProvider,
IContentSerializer contentSerializer,
IMessagePacker messagePacker)
{
_logger = logger;
_serviceProvider = serviceProvider;
_contentSerializer = contentSerializer;
_messagePacker = messagePacker;
}
public async Task SendAsync(string messageId, string topicName, object bodyObj)
{
string body;
if (bodyObj != null && Helper.IsComplexType(bodyObj.GetType()))
{
body = _contentSerializer.Serialize(bodyObj);
}
else
{
body = bodyObj?.ToString();
}
_logger.LogDebug($"Callback message will publishing, name:{topicName},content:{body}");
var callbackMessage = new CapMessageDto
{
Id = messageId,
Content = body
};
var content = _messagePacker.Pack(callbackMessage);
var publishedMessage = new CapPublishedMessage
{
Id = SnowflakeId.Default().NextId(),
Name = topicName,
Content = content,
StatusName = StatusName.Scheduled
};
using (var scope = _serviceProvider.CreateScope())
{
var provider = scope.ServiceProvider;
var callbackPublisher = provider.GetService<ICallbackPublisher>();
await callbackPublisher.PublishCallbackAsync(publishedMessage);
}
}
}
}
\ No newline at end of file
......@@ -5,7 +5,7 @@ using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Internal;
using Microsoft.Extensions.Logging;
......@@ -15,18 +15,19 @@ namespace DotNetCore.CAP.Internal
internal class DefaultConsumerInvoker : IConsumerInvoker
{
private readonly ILogger _logger;
private readonly IMessagePacker _messagePacker;
private readonly IModelBinderFactory _modelBinderFactory;
//private readonly IMessagePacker _messagePacker;
//private readonly IModelBinderFactory _modelBinderFactory;
private readonly IServiceProvider _serviceProvider;
public DefaultConsumerInvoker(ILoggerFactory loggerFactory,
IServiceProvider serviceProvider,
IMessagePacker messagePacker,
IModelBinderFactory modelBinderFactory)
IServiceProvider serviceProvider
//IMessagePacker messagePacker,
//IModelBinderFactory modelBinderFactory
)
{
_modelBinderFactory = modelBinderFactory;
//_modelBinderFactory = modelBinderFactory;
_serviceProvider = serviceProvider;
_messagePacker = messagePacker;
//_messagePacker = messagePacker;
_logger = loggerFactory.CreateLogger<DefaultConsumerInvoker>();
}
......@@ -58,20 +59,21 @@ namespace DotNetCore.CAP.Internal
obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, implType);
}
var jsonContent = context.DeliverMessage.Content;
var message = _messagePacker.UnPack(jsonContent);
//var jsonContent = context.DeliverMessage.Content;
//var message = _messagePacker.UnPack(jsonContent);
var message = context.DeliverMessage;
object resultObj;
if (executor.MethodParameters.Length > 0)
{
resultObj = await ExecuteWithParameterAsync(executor, obj, message.Content);
resultObj = await ExecuteWithParameterAsync(executor, obj, message.Value);
}
else
{
resultObj = await ExecuteAsync(executor, obj);
}
return new ConsumerExecutedResult(resultObj, message.Id, message.CallbackName);
return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName());
}
}
......@@ -85,31 +87,35 @@ namespace DotNetCore.CAP.Internal
return executor.Execute(@class);
}
private async Task<object> ExecuteWithParameterAsync(ObjectMethodExecutor executor,
object @class, string parameterString)
private async Task<object> ExecuteWithParameterAsync(ObjectMethodExecutor executor, object @class, object parameter)
{
var firstParameter = executor.MethodParameters[0];
try
{
var binder = _modelBinderFactory.CreateBinder(firstParameter);
var bindResult = await binder.BindModelAsync(parameterString);
if (bindResult.IsSuccess)
{
if (executor.IsMethodAsync)
{
return await executor.ExecuteAsync(@class, bindResult.Model);
}
return executor.Execute(@class, bindResult.Model);
return await executor.ExecuteAsync(@class, parameter);
}
throw new MethodBindException(
$"Parameters:{firstParameter.Name} bind failed! ParameterString is: {parameterString} ");
return executor.Execute(@class, parameter);
//var binder = _modelBinderFactory.CreateBinder(firstParameter);
//var bindResult = await binder.BindModelAsync(parameter);
//if (bindResult.IsSuccess)
//{
// if (executor.IsMethodAsync)
// {
// return await executor.ExecuteAsync(@class, bindResult.Model);
// }
// return executor.Execute(@class, bindResult.Model);
//}
//throw new MethodBindException(
// $"Parameters:{firstParameter.Name} bind failed! ParameterString is: {parameter} ");
}
catch (FormatException ex)
{
_logger.ModelBinderFormattingException(executor.MethodInfo?.Name, firstParameter.Name, parameterString,
ex);
//_logger.ModelBinderFormattingException(executor.MethodInfo?.Name, firstParameter.Name, parameter, ex);
return null;
}
}
......
......@@ -2,7 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.Internal
{
......
......@@ -6,44 +6,43 @@ using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Processor.States;
using DotNetCore.CAP.Serialization;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP
namespace DotNetCore.CAP.Internal
{
public abstract class BasePublishMessageSender : IPublishMessageSender, IPublishExecutor
internal class MessageSender : IMessageSender
{
private readonly IStorageConnection _connection;
private readonly IDataStorage _dataStorage;
private readonly ISerializer _serializer;
private readonly ITransport _transport;
private readonly ILogger _logger;
private readonly CapOptions _options;
private readonly IStateChanger _stateChanger;
private readonly IOptions<CapOptions> _options;
protected abstract string ServersAddress { get; }
// diagnostics listener
// ReSharper disable once InconsistentNaming
protected static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);
protected BasePublishMessageSender(
protected MessageSender(
ILogger logger,
IOptions<CapOptions> options,
IStorageConnection connection,
IStateChanger stateChanger)
{
_options = options.Value;
_connection = connection;
_stateChanger = stateChanger;
IDataStorage dataStorage,
ISerializer serializer,
ITransport transport)
{
_options = options;
_dataStorage = dataStorage;
_serializer = serializer;
_transport = transport;
_logger = logger;
}
public abstract Task<OperateResult> PublishAsync(string keyName, string content);
public async Task<OperateResult> SendAsync(CapPublishedMessage message)
public async Task<OperateResult> SendAsync(MediumMessage message)
{
bool retry;
OperateResult result;
......@@ -61,78 +60,75 @@ namespace DotNetCore.CAP
return result;
}
private async Task<(bool, OperateResult)> SendWithoutRetryAsync(CapPublishedMessage message)
private async Task<(bool, OperateResult)> SendWithoutRetryAsync(MediumMessage message)
{
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
var tracingResult = TracingBefore(message.Name, message.Content);
var operationId = tracingResult.Item1;
var operationId = TracingBefore(message.Origin);
var sendValues = tracingResult.Item2 != null
? Helper.AddTracingHeaderProperty(message.Content, tracingResult.Item2)
: message.Content;
var result = await PublishAsync(message.Name, sendValues);
var transportMsg = await _serializer.SerializeAsync(message.Origin);
var result = await _transport.SendAsync(transportMsg);
stopwatch.Stop();
if (result.Succeeded)
{
await SetSuccessfulState(message);
TracingAfter(operationId, message.Name, sendValues, startTime, stopwatch.Elapsed);
if (operationId != null)
{
TracingAfter(operationId.Value, message.Origin, startTime, stopwatch.Elapsed);
}
return (false, OperateResult.Success);
}
else
{
TracingError(operationId, message, result, startTime, stopwatch.Elapsed);
if (operationId != null)
{
TracingError(operationId.Value, message.Origin, result, startTime, stopwatch.Elapsed);
}
var needRetry = await SetFailedState(message, result.Exception);
return (needRetry, OperateResult.Failed(result.Exception));
}
}
private Task SetSuccessfulState(CapPublishedMessage message)
private async Task SetSuccessfulState(MediumMessage message)
{
var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter);
return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
message.ExpiresAt = DateTime.Now.AddSeconds(_options.Value.SucceedMessageExpiredAfter);
await _dataStorage.ChangePublishStateAsync(message, StatusName.Succeeded);
}
private async Task<bool> SetFailedState(CapPublishedMessage message, Exception ex)
private async Task<bool> SetFailedState(MediumMessage message, Exception ex)
{
AddErrorReasonToContent(message, ex);
//TODO: Add exception to content
var needRetry = UpdateMessageForRetry(message);
await _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);
await _dataStorage.ChangePublishStateAsync(message, StatusName.Failed);
return needRetry;
}
private static void AddErrorReasonToContent(CapPublishedMessage message, Exception exception)
{
message.Content = Helper.AddExceptionProperty(message.Content, exception);
}
private bool UpdateMessageForRetry(CapPublishedMessage message)
private bool UpdateMessageForRetry(MediumMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;
var retries = ++message.Retries;
message.ExpiresAt = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount);
var retryCount = Math.Min(_options.Value.FailedRetryCount, retryBehavior.RetryCount);
if (retries >= retryCount)
{
if (retries == _options.FailedRetryCount)
if (retries == _options.Value.FailedRetryCount)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);
_options.Value.FailedThresholdCallback?.Invoke(MessageType.Publish, message.Origin);
_logger.SenderAfterThreshold(message.Id, _options.FailedRetryCount);
_logger.SenderAfterThreshold(message.DbId, _options.Value.FailedRetryCount);
}
catch (Exception ex)
{
......@@ -142,57 +138,47 @@ namespace DotNetCore.CAP
return false;
}
_logger.SenderRetrying(message.Id, retries);
_logger.SenderRetrying(message.DbId, retries);
return true;
}
private (Guid, TracingHeaders) TracingBefore(string topic, string values)
private Guid? TracingBefore(Message message)
{
if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerExtensions.CapBeforePublish))
{
Guid operationId = Guid.NewGuid();
var operationId = Guid.NewGuid();
var eventData = new BrokerPublishEventData(
operationId, "",
ServersAddress, topic,
values,
DateTimeOffset.UtcNow);
var eventData = new BrokerPublishEventData(operationId, "",_transport.Address, message,DateTimeOffset.UtcNow);
s_diagnosticListener.WritePublishBefore(eventData);
s_diagnosticListener.Write(CapDiagnosticListenerExtensions.CapBeforePublish, eventData);
return (operationId, eventData.Headers); //if not enabled diagnostics ,the header will be null
return operationId;
}
private void TracingAfter(Guid operationId, string topic, string values, DateTimeOffset startTime, TimeSpan du)
return null;
}
private void TracingAfter(Guid operationId, Message message, DateTimeOffset startTime, TimeSpan du)
{
if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerExtensions.CapAfterPublish))
{
var eventData = new BrokerPublishEndEventData(
operationId,
"",
ServersAddress,
topic,
values,
startTime,
du);
var eventData = new BrokerPublishEndEventData(operationId, "", _transport.Address, message, startTime, du);
s_diagnosticListener.WritePublishAfter(eventData);
s_diagnosticListener.Write(CapDiagnosticListenerExtensions.CapAfterPublish, eventData);
}
}
private void TracingError(Guid operationId, CapPublishedMessage message, OperateResult result, DateTimeOffset startTime, TimeSpan du)
private void TracingError(Guid operationId, Message message, OperateResult result, DateTimeOffset startTime, TimeSpan du)
{
if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerExtensions.CapAfterPublish))
{
var ex = new PublisherSentFailedException(result.ToString(), result.Exception);
var eventData = new BrokerPublishErrorEventData(operationId, "", _transport.Address,
message, ex, startTime, du);
_logger.MessagePublishException(message.Id, result.ToString(), ex);
var eventData = new BrokerPublishErrorEventData(
operationId,
"",
ServersAddress,
message.Name,
message.Content,
ex,
startTime,
du);
s_diagnosticListener.WritePublishError(eventData);
s_diagnosticListener.Write(CapDiagnosticListenerExtensions.CapErrorPublish, eventData);
}
}
}
}
\ No newline at end of file
......@@ -2,11 +2,12 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Threading.Tasks;
using DotNetCore.CAP.Persistence;
namespace DotNetCore.CAP.Internal
{
internal interface ICallbackMessageSender
public interface IMessageSender
{
Task SendAsync(string messageId, string topicName, object bodyObj);
Task<OperateResult> SendAsync(MediumMessage message);
}
}
\ No newline at end of file
......@@ -10,12 +10,12 @@ namespace DotNetCore.CAP
[SuppressMessage("ReSharper", "InconsistentNaming")]
internal static class LoggerExtensions
{
public static void ConsumerExecutedAfterThreshold(this ILogger logger, long messageId, int retries)
public static void ConsumerExecutedAfterThreshold(this ILogger logger, string messageId, int retries)
{
logger.LogWarning($"The Subscriber of the message({messageId}) still fails after {retries}th executions and we will stop retrying.");
}
public static void SenderAfterThreshold(this ILogger logger, long messageId, int retries)
public static void SenderAfterThreshold(this ILogger logger, string messageId, int retries)
{
logger.LogWarning($"The Publisher of the message({messageId}) still fails after {retries}th sends and we will stop retrying.");
}
......@@ -25,12 +25,12 @@ namespace DotNetCore.CAP
logger.LogWarning(ex, "FailedThresholdCallback action raised an exception:" + ex.Message);
}
public static void ConsumerExecutionRetrying(this ILogger logger, long messageId, int retries)
public static void ConsumerExecutionRetrying(this ILogger logger, string messageId, int retries)
{
logger.LogWarning($"The {retries}th retrying consume a message failed. message id: {messageId}");
}
public static void SenderRetrying(this ILogger logger, long messageId, int retries)
public static void SenderRetrying(this ILogger logger, string messageId, int retries)
{
logger.LogWarning($"The {retries}th retrying send a message failed. message id: {messageId} ");
}
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Collections.Generic;
namespace DotNetCore.CAP
{
/// <summary>
/// Message context
/// </summary>
public class MessageContext
{
/// <summary>
/// Gets or sets the message group.
/// </summary>
public string Group { get; set; }
/// <summary>
/// Message name.
/// </summary>
public string Name { get; set; }
/// <summary>
/// Message content
/// </summary>
public string Content { get; set; }
public override string ToString()
{
return $"Group:{Group}, Name:{Name}, Content:{Content}";
}
}
}
\ No newline at end of file
......@@ -4,7 +4,7 @@
using System;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP.Models
namespace DotNetCore.CAP.Messages
{
public abstract class CapMessage
{
......@@ -36,6 +36,5 @@ namespace DotNetCore.CAP.Models
public override string Content { get; set; }
public override string CallbackName { get; set; }
}
}
\ No newline at end of file
namespace DotNetCore.CAP.Messages
{
public static class Headers
{
/// <summary>
/// Id of the message. Either set the ID explicitly when sending a message, or Rebus will assign one to the message.
/// </summary>
public const string MessageId = "cap-msg-id";
public const string MessageName = "cap-msg-name";
public const string CorrelationId = "cap-corr-id";
public const string CorrelationSequence = "cap-corr-seq";
/// <summary>
/// Message value .NET type
/// </summary>
public const string Type = "cap-msg-type";
public const string CallbackName = "cap-callback-name";
public const string Group = "cap-msg-group";
public const string SentTime = "cap-senttime";
public const string ContentType = "cap-content-type";
}
}
using System;
using System.Collections.Generic;
namespace DotNetCore.CAP.Messages
{
public class Message
{
public Message(IDictionary<string, string> headers, object value)
{
Headers = headers ?? throw new ArgumentNullException(nameof(headers));
Value = value ?? throw new ArgumentNullException(nameof(value));
}
public IDictionary<string, string> Headers { get; }
public object Value { get; }
}
public static class MessageExtensions
{
public static string GetId(this Message message)
{
message.Headers.TryGetValue(Headers.MessageId, out var value);
return value;
}
public static string GetName(this Message message)
{
message.Headers.TryGetValue(Headers.MessageName, out var value);
return value;
}
public static string GetCallbackName(this Message message)
{
message.Headers.TryGetValue(Headers.CallbackName, out var value);
return value;
}
public static string GetGroup(this Message message)
{
message.Headers.TryGetValue(Headers.Group, out var value);
return value;
}
public static int GetCorrelationSequence(this Message message)
{
if (message.Headers.TryGetValue(Headers.CorrelationSequence, out var value))
{
return int.Parse(value);
}
return 0;
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP.Models
namespace DotNetCore.CAP.Messages
{
public enum MessageType
{
......
using System;
using System.Collections.Generic;
namespace DotNetCore.CAP.Messages
{
/// <summary>
/// Message content field
/// </summary>
public class TransportMessage
{
public TransportMessage(Dictionary<string, string> headers, byte[] body)
{
Headers = headers ?? throw new ArgumentNullException(nameof(headers));
Body = body ?? throw new ArgumentNullException(nameof(body));
}
/// <summary>
/// Gets the headers of this message
/// </summary>
public Dictionary<string, string> Headers { get; }
/// <summary>
/// Gets the body object of this message
/// </summary>
public byte[] Body { get; }
public string GetName()
{
return Headers.TryGetValue(Messages.Headers.MessageName, out var value) ? value : null;
}
}
}
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
namespace DotNetCore.CAP.Models
{
public class CapReceivedMessage
{
/// <summary>
/// Initializes a new instance of <see cref="CapReceivedMessage" />.
/// </summary>
public CapReceivedMessage()
{
Added = DateTime.Now;
}
public CapReceivedMessage(MessageContext message) : this()
{
Group = message.Group;
Name = message.Name;
Content = message.Content;
}
public long Id { get; set; }
public string Group { get; set; }
public string Name { get; set; }
public string Content { get; set; }
public DateTime Added { get; set; }
public DateTime? ExpiresAt { get; set; }
public int Retries { get; set; }
public string StatusName { get; set; }
public MessageContext ToMessageContext()
{
return new MessageContext
{
Group = Group,
Name = Name,
Content = Content
};
}
public override string ToString()
{
return "name:" + Name + ", group:" + Group + ", content:" + Content;
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
namespace DotNetCore.CAP.Messages
{
public class CapPublishedMessage
{
public CapPublishedMessage()
{
Added = DateTime.Now;
}
public Message Message { get; set; }
public DateTime Added { get; set; }
public DateTime? ExpiresAt { get; set; }
public int Retries { get; set; }
public string StatusName { get; set; }
}
}
\ No newline at end of file
......@@ -3,20 +3,22 @@
using System;
namespace DotNetCore.CAP.Models
namespace DotNetCore.CAP.Messages
{
public class CapPublishedMessage
public class CapReceivedMessage
{
/// <summary>
/// Initializes a new instance of <see cref="CapPublishedMessage" />.
/// Initializes a new instance of <see cref="CapReceivedMessage" />.
/// </summary>
public CapPublishedMessage()
public CapReceivedMessage()
{
Added = DateTime.Now;
}
public long Id { get; set; }
public string Group { get; set; }
public string Name { get; set; }
public string Content { get; set; }
......@@ -31,7 +33,7 @@ namespace DotNetCore.CAP.Models
public override string ToString()
{
return "name:" + Name + ", content:" + Content;
return "name:" + Name + ", group:" + Group + ", content:" + Content;
}
}
}
\ No newline at end of file
//using System;
//using System.Collections.Generic;
//using DotNetCore.CAP.Dashboard.Monitoring;
//using DotNetCore.CAP.Messages;
//namespace DotNetCore.CAP.Persistence
//{
// public interface IDashboardQuerying
// {
// StatisticsDto GetStatistics();
// IList<MessageDto> Messages(MessageQueryDto queryDto);
// int PublishedFailedCount();
// int PublishedSucceededCount();
// int ReceivedFailedCount();
// int ReceivedSucceededCount();
// IDictionary<DateTime, int> HourlySucceededJobs(MessageType type);
// IDictionary<DateTime, int> HourlyFailedJobs(MessageType type);
// }
//}
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.Persistence
{
public interface IDataStorage
{
Task ChangePublishStateAsync(MediumMessage message, StatusName state);
Task ChangeReceiveStateAsync(MediumMessage message, StatusName state);
Task<MediumMessage> StoreMessageAsync(string name, Message content, object dbTransaction = null, CancellationToken cancellationToken = default);
Task<MediumMessage> StoreMessageAsync(string name, string group, Message content, CancellationToken cancellationToken = default);
Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default);
Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry();
Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry();
//Task<CapPublishedMessage> GetPublishedMessageAsync(long id);
//Task<CapReceivedMessage> GetReceivedMessageAsync(long id);
//public void UpdateMessage(CapPublishedMessage message)
//{
// if (message == null)
// {
// throw new ArgumentNullException(nameof(message));
// }
// var sql =
// $"UPDATE `{_prefix}.published` SET `Retries` = @Retries,`Content`= @Content,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
// _dbConnection.Execute(sql, message);
//}
//public void UpdateMessage(CapReceivedMessage message)
//{
// if (message == null)
// {
// throw new ArgumentNullException(nameof(message));
// }
// var sql = $"UPDATE `{_prefix}.received` SET `Retries` = @Retries,`Content`= @Content,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
// _dbConnection.Execute(sql, message);
//}
}
}
using System.Threading;
using System.Threading.Tasks;
namespace DotNetCore.CAP.Persistence
{
public interface IStorageInitializer
{
Task InitializeAsync(CancellationToken cancellationToken);
string GetPublishedTableName();
string GetReceivedTableName();
}
}
using System;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.Persistence
{
public class MediumMessage
{
public string DbId { get; set; }
public Message Origin { get; set; }
public DateTime Added { get; set; }
public DateTime? ExpiresAt { get; set; }
public int Retries { get; set; }
}
}
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace DotNetCore.CAP.Processor
{
public interface ICollectProcessor : IProcessor
{
}
}
\ No newline at end of file
......@@ -5,7 +5,8 @@ using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Persistence;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Processor
......@@ -13,19 +14,18 @@ namespace DotNetCore.CAP.Processor
public class Dispatcher : IDispatcher, IDisposable
{
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly IMessageSender _sender;
private readonly ISubscriberExecutor _executor;
private readonly ILogger<Dispatcher> _logger;
private readonly BlockingCollection<CapPublishedMessage> _publishedMessageQueue =
new BlockingCollection<CapPublishedMessage>(new ConcurrentQueue<CapPublishedMessage>());
private readonly BlockingCollection<MediumMessage> _publishedMessageQueue =
new BlockingCollection<MediumMessage>(new ConcurrentQueue<MediumMessage>());
private readonly BlockingCollection<CapReceivedMessage> _receivedMessageQueue =
new BlockingCollection<CapReceivedMessage>(new ConcurrentQueue<CapReceivedMessage>());
private readonly IPublishMessageSender _sender;
private readonly BlockingCollection<MediumMessage> _receivedMessageQueue =
new BlockingCollection<MediumMessage>(new ConcurrentQueue<MediumMessage>());
public Dispatcher(ILogger<Dispatcher> logger,
IPublishMessageSender sender,
IMessageSender sender,
ISubscriberExecutor executor)
{
_logger = logger;
......@@ -36,12 +36,12 @@ namespace DotNetCore.CAP.Processor
Task.Factory.StartNew(Processing, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
public void EnqueueToPublish(CapPublishedMessage message)
public void EnqueueToPublish(MediumMessage message)
{
_publishedMessageQueue.Add(message);
}
public void EnqueueToExecute(CapReceivedMessage message)
public void EnqueueToExecute(MediumMessage message)
{
_receivedMessageQueue.Add(message);
}
......@@ -67,7 +67,7 @@ namespace DotNetCore.CAP.Processor
}
catch (Exception ex)
{
_logger.LogError(ex, $"An exception occurred when sending a message to the MQ. Topic:{message.Name}, Id:{message.Id}");
_logger.LogError(ex, $"An exception occurred when sending a message to the MQ. Id:{message.DbId}");
}
});
}
......
......@@ -95,7 +95,7 @@ namespace DotNetCore.CAP.Processor
{
_provider.GetRequiredService<TransportCheckProcessor>(),
_provider.GetRequiredService<MessageNeedToRetryProcessor>(),
_provider.GetRequiredService<ICollectProcessor>()
_provider.GetRequiredService<CollectorProcessor>()
};
return returnedProcessors.ToArray();
......
......@@ -3,56 +3,55 @@
using System;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Persistence;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
namespace DotNetCore.CAP.Processor
{
internal class MySqlCollectProcessor : ICollectProcessor
public class CollectorProcessor : IProcessor
{
private const int MaxBatch = 1000;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly ILogger _logger;
private readonly MySqlOptions _options;
private readonly IStorageInitializer _initializer;
private readonly IDataStorage _storage;
private const int ItemBatch = 1000;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
public MySqlCollectProcessor(ILogger<MySqlCollectProcessor> logger, IOptions<MySqlOptions> mysqlOptions)
public CollectorProcessor(
ILogger<CollectorProcessor> logger,
IStorageInitializer initializer,
IDataStorage storage)
{
_logger = logger;
_options = mysqlOptions.Value;
_initializer = initializer;
_storage = storage;
}
public async Task ProcessAsync(ProcessingContext context)
{
var tables = new[]
{
$"{_options.TableNamePrefix}.published",
$"{_options.TableNamePrefix}.received"
_initializer.GetPublishedTableName(),
_initializer.GetReceivedTableName()
};
foreach (var table in tables)
{
_logger.LogDebug($"Collecting expired data from table [{table}].");
int removedCount;
int deletedCount;
var time = DateTime.Now;
do
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
removedCount = await connection.ExecuteAsync(
$@"DELETE FROM `{table}` WHERE ExpiresAt < @now limit @count;",
new { now = DateTime.Now, count = MaxBatch });
}
deletedCount = await _storage.DeleteExpiresAsync(table, time, ItemBatch, context.CancellationToken);
if (removedCount != 0)
if (deletedCount != 0)
{
await context.WaitAsync(_delay);
context.ThrowIfStopping();
}
} while (removedCount != 0);
} while (deletedCount != 0);
}
await context.WaitAsync(_waitingInterval);
......
......@@ -5,6 +5,8 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Persistence;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
......@@ -15,7 +17,7 @@ namespace DotNetCore.CAP.Processor
{
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly ILogger<MessageNeedToRetryProcessor> _logger;
private readonly IPublishMessageSender _publishMessageSender;
private readonly IMessageSender _messageSender;
private readonly ISubscriberExecutor _subscriberExecutor;
private readonly TimeSpan _waitingInterval;
......@@ -23,11 +25,11 @@ namespace DotNetCore.CAP.Processor
IOptions<CapOptions> options,
ILogger<MessageNeedToRetryProcessor> logger,
ISubscriberExecutor subscriberExecutor,
IPublishMessageSender publishMessageSender)
IMessageSender messageSender)
{
_logger = logger;
_subscriberExecutor = subscriberExecutor;
_publishMessageSender = publishMessageSender;
_messageSender = messageSender;
_waitingInterval = TimeSpan.FromSeconds(options.Value.FailedRetryInterval);
}
......@@ -38,14 +40,14 @@ namespace DotNetCore.CAP.Processor
throw new ArgumentNullException(nameof(context));
}
var connection = context.Provider.GetRequiredService<IStorageConnection>();
var storage = context.Provider.GetRequiredService<IDataStorage>();
await Task.WhenAll(ProcessPublishedAsync(connection, context), ProcessReceivedAsync(connection, context));
await Task.WhenAll(ProcessPublishedAsync(storage, context), ProcessReceivedAsync(storage, context));
await context.WaitAsync(_waitingInterval);
}
private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context)
private async Task ProcessPublishedAsync(IDataStorage connection, ProcessingContext context)
{
context.ThrowIfStopping();
......@@ -53,13 +55,13 @@ namespace DotNetCore.CAP.Processor
foreach (var message in messages)
{
await _publishMessageSender.SendAsync(message);
await _messageSender.SendAsync(message);
await context.WaitAsync(_delay);
}
}
private async Task ProcessReceivedAsync(IStorageConnection connection, ProcessingContext context)
private async Task ProcessReceivedAsync(IDataStorage connection, ProcessingContext context)
{
context.ThrowIfStopping();
......
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Processor.States
{
public class FailedState : IState
{
public const string StateName = "Failed";
public TimeSpan? ExpiresAfter => TimeSpan.FromDays(15);
public string Name => StateName;
public void Apply(CapPublishedMessage message, IStorageTransaction transaction)
{
}
public void Apply(CapReceivedMessage message, IStorageTransaction transaction)
{
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Processor.States
{
public class ScheduledState : IState
{
public const string StateName = "Scheduled";
public TimeSpan? ExpiresAfter => null;
public string Name => StateName;
public void Apply(CapPublishedMessage message, IStorageTransaction transaction)
{
}
public void Apply(CapReceivedMessage message, IStorageTransaction transaction)
{
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Processor.States
{
public class SucceededState : IState
{
public const string StateName = "Succeeded";
public SucceededState()
{
ExpiresAfter = TimeSpan.FromHours(1);
}
public SucceededState(int expireAfterSeconds)
{
ExpiresAfter = TimeSpan.FromSeconds(expireAfterSeconds);
}
public TimeSpan? ExpiresAfter { get; }
public string Name => StateName;
public void Apply(CapPublishedMessage message, IStorageTransaction transaction)
{
}
public void Apply(CapReceivedMessage message, IStorageTransaction transaction)
{
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Processor.States
{
public interface IState
{
TimeSpan? ExpiresAfter { get; }
string Name { get; }
void Apply(CapPublishedMessage message, IStorageTransaction transaction);
void Apply(CapReceivedMessage message, IStorageTransaction transaction);
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Processor.States
{
public class StateChanger : IStateChanger
{
public void ChangeState(CapPublishedMessage message, IState state, IStorageTransaction transaction)
{
var now = DateTime.Now;
if (state.ExpiresAfter != null)
{
message.ExpiresAt = now.Add(state.ExpiresAfter.Value);
}
else
{
message.ExpiresAt = null;
}
message.StatusName = state.Name;
state.Apply(message, transaction);
transaction.UpdateMessage(message);
}
public void ChangeState(CapReceivedMessage message, IState state, IStorageTransaction transaction)
{
var now = DateTime.Now;
if (state.ExpiresAfter != null)
{
message.ExpiresAt = now.Add(state.ExpiresAfter.Value);
}
else
{
message.ExpiresAt = null;
}
message.StatusName = state.Name;
state.Apply(message, transaction);
transaction.UpdateMessage(message);
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Processor.States
{
public static class StateChangerExtensions
{
public static async Task ChangeStateAsync(
this IStateChanger @this, CapPublishedMessage message, IState state, IStorageConnection connection)
{
using (var transaction = connection.CreateTransaction())
{
@this.ChangeState(message, state, transaction);
await transaction.CommitAsync();
}
}
public static async Task ChangeStateAsync(
this IStateChanger @this, CapReceivedMessage message, IState state, IStorageConnection connection)
{
using (var transaction = connection.CreateTransaction())
{
@this.ChangeState(message, state, transaction);
await transaction.CommitAsync();
}
}
}
}
\ No newline at end of file
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Processor.States
{
public interface IStateChanger
{
void ChangeState(CapPublishedMessage message, IState state, IStorageTransaction transaction);
void ChangeState(CapReceivedMessage message, IState state, IStorageTransaction transaction);
}
}
\ No newline at end of file
using System.Threading.Tasks;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.Serialization
{
public interface ISerializer
{
/// <summary>
/// Serializes the given <see cref="Message"/> into a <see cref="TransportMessage"/>
/// </summary>
Task<TransportMessage> SerializeAsync(Message message);
/// <summary>
/// Deserializes the given <see cref="TransportMessage"/> back into a <see cref="Message"/>
/// </summary>
Task<Message> DeserializeAsync(TransportMessage transportMessage);
}
}
\ No newline at end of file
using System;
using System.Runtime.Serialization;
using DotNetCore.CAP.Messages;
using Newtonsoft.Json;
namespace DotNetCore.CAP.Serialization
{
public class StringSerializer
{
public static string Serialize(Message message)
{
return JsonConvert.SerializeObject(message);
}
public static Message DeSerialize(string json)
{
try
{
return JsonConvert.DeserializeObject<Message>(json);
}
catch (Exception exception)
{
throw new SerializationException($"Could not deserialize JSON text '{json}'", exception);
}
}
}
}
\ No newline at end of file
using System.Threading.Tasks;
using DotNetCore.CAP.Messages;
namespace DotNetCore.CAP.Transport
{
public interface ITransport
{
string Address { get; }
Task<OperateResult> SendAsync(TransportMessage message);
}
}
......@@ -2,7 +2,7 @@ using System;
using System.Linq;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using FluentAssertions;
using Xunit;
......
using System;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using MongoDB.Driver;
......
......@@ -2,7 +2,7 @@
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.Options;
using Xunit;
......
......@@ -2,7 +2,7 @@
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.Options;
using Xunit;
......
......@@ -2,7 +2,7 @@
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Xunit;
namespace DotNetCore.CAP.SqlServer.Test
......
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Xunit;
......@@ -130,6 +131,12 @@ namespace DotNetCore.CAP.Test
throw new NotImplementedException();
}
public Task PublishAsync<T>(string name, T contentObj, IDictionary<string, string> optionHeaders = null,
CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
public void Publish<T>(string name, T contentObj, string callbackName = null)
{
throw new NotImplementedException();
......
......@@ -2,7 +2,7 @@
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.DependencyInjection;
using Moq;
using Xunit;
......
......@@ -3,7 +3,7 @@ using System.Reflection;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
......
using System;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using Newtonsoft.Json;
using Xunit;
......
using System;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Processor.States;
using Moq;
using Xunit;
......
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
using Newtonsoft.Json;
using Xunit;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment