Commit d7b1b586 authored by Savorboard's avatar Savorboard

Merge branch 'develop' of https://github.com/dotnetcore/CAP into develop

parents 4216c40a d99ec600
...@@ -37,7 +37,7 @@ namespace Sample.Kafka.MySql.Controllers ...@@ -37,7 +37,7 @@ namespace Sample.Kafka.MySql.Controllers
[CapSubscribe("xxx.xxx.test2")] [CapSubscribe("xxx.xxx.test2")]
public void Test2(int value) public void Test2(int value)
{ {
Console.WriteLine(value); Console.WriteLine("Subscriber output message: " + value);
} }
} }
} }
\ No newline at end of file
using Microsoft.AspNetCore.Builder; using System;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
namespace Sample.Kafka.MySql namespace Sample.Kafka.MySql
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
internalLogFile="logs/internal-nlog.txt"> internalLogFile="logs/internal-nlog.txt">
<variable name="myLogLayout" <variable name="myLogLayout"
value="---------------------------------------------------------------------------${newline}日期:${longdate} 级别:${uppercase:${level}} 用户:${aspnet-user-identity}(${aspnet-request-ip}) 记录器:${logger} ${newline}URL:${aspnet-request-method} ${aspnet-request-url:IncludePort=true:IncludeQueryString=true} ${newline}Action:${aspnet-mvc-action} ${newline}Message:${message} ${newline}${onexception:Exception:${exception:format=toString}}" /> value="---------------------------------------------------------------------------${newline}Date:${longdate} Level:${uppercase:${level}} User:${aspnet-user-identity}(${aspnet-request-ip}) Logger:${logger} URL:${aspnet-request-method} ${aspnet-request-url:IncludePort=true:IncludeQueryString=true} Action:${aspnet-mvc-action} ${newline}Message:${message} ${newline}${onexception:Exception:${exception:format=toString}}" />
<extensions> <extensions>
<add assembly="NLog.Web.AspNetCore" /> <add assembly="NLog.Web.AspNetCore" />
</extensions> </extensions>
......
using System; using Microsoft.EntityFrameworkCore;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
namespace Sample.RabbitMQ.MySql namespace Sample.RabbitMQ.MySql
{ {
...@@ -10,8 +6,7 @@ namespace Sample.RabbitMQ.MySql ...@@ -10,8 +6,7 @@ namespace Sample.RabbitMQ.MySql
{ {
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{ {
optionsBuilder.UseMySql("Server=localhost;Database=Sample.RabbitMQ.MySql;UserId=root;Password=123123;Allow User Variables=True"); optionsBuilder.UseMySql("Server=192.168.10.110;Database=testcap;UserId=root;Password=123123;");
//optionsBuilder.UseMySql("Server=192.168.2.206;Database=Sample.RabbitMQ.MySql;UserId=root;Password=123123;");
} }
} }
} }
...@@ -21,7 +21,7 @@ namespace Sample.RabbitMQ.MySql.Controllers ...@@ -21,7 +21,7 @@ namespace Sample.RabbitMQ.MySql.Controllers
public IActionResult PublishMessage() public IActionResult PublishMessage()
{ {
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
return Ok(); return Ok();
} }
...@@ -50,7 +50,7 @@ namespace Sample.RabbitMQ.MySql.Controllers ...@@ -50,7 +50,7 @@ namespace Sample.RabbitMQ.MySql.Controllers
[CapSubscribe("sample.rabbitmq.mysql")] [CapSubscribe("sample.rabbitmq.mysql")]
public void ReceiveMessage(DateTime time) public void ReceiveMessage(DateTime time)
{ {
Console.WriteLine("[sample.rabbitmq.mysql] message received: "+ DateTime.Now.ToString() +" , sent time: " + time.ToString()); Console.WriteLine("[sample.rabbitmq.mysql] message received: " + DateTime.Now + ",sent time: " + time);
} }
} }
} }
using System; using Microsoft.AspNetCore;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration; using NLog.Web;
namespace Sample.RabbitMQ.MySql namespace Sample.RabbitMQ.MySql
{ {
...@@ -20,6 +14,11 @@ namespace Sample.RabbitMQ.MySql ...@@ -20,6 +14,11 @@ namespace Sample.RabbitMQ.MySql
public static IWebHost BuildWebHost(string[] args) => public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args) WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>() .UseStartup<Startup>()
.ConfigureLogging((hostingContext, builder) =>
{
hostingContext.HostingEnvironment.ConfigureNLog("nlog.config");
})
.UseNLog()
.Build(); .Build();
} }
} }
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.6" /> <PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.6" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="2.0.1" /> <PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="2.0.1" />
<PackageReference Include="NLog.Web.AspNetCore" Version="4.5.2" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" /> <DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" />
...@@ -21,5 +22,10 @@ ...@@ -21,5 +22,10 @@
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" /> <ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" /> <ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<Content Update="nlog.config">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
</Project> </Project>
<?xml version="1.0" encoding="utf-8"?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
autoReload="true"
internalLogLevel="Warn"
internalLogFile="logs/internal-nlog.txt">
<variable name="myLogLayout"
value="---------------------------------------------------------------------------${newline}Date:${longdate} Level:${uppercase:${level}} User:${aspnet-user-identity}(${aspnet-request-ip}) Logger:${logger} URL:${aspnet-request-method} ${aspnet-request-url:IncludePort=true:IncludeQueryString=true} Action:${aspnet-mvc-action} ${newline}Message:${message} ${newline}${onexception:Exception:${exception:format=toString}}" />
<extensions>
<add assembly="NLog.Web.AspNetCore" />
</extensions>
<!-- define various log targets -->
<targets>
<!-- write logs to file -->
<target name="allfile" xsi:type="File" fileName="logs/cap-all-${shortdate}.log"
layout="${myLogLayout}" />
</targets>
<rules>
<!--All logs, including from Microsoft-->
<logger name="*" minlevel="Debug" writeTo="allfile" />
</rules>
</nlog>
\ No newline at end of file
...@@ -58,9 +58,9 @@ namespace DotNetCore.CAP ...@@ -58,9 +58,9 @@ namespace DotNetCore.CAP
public int FailedRetryInterval { get; set; } public int FailedRetryInterval { get; set; }
/// <summary> /// <summary>
/// We’ll invoke this call-back with message type,name,content when requeue failed message. /// We’ll invoke this call-back with message type,name,content when retry failed (send or executed) messages equals <see cref="FailedRetryCount"/> times.
/// </summary> /// </summary>
public Action<MessageType, string, string> FailedCallback { get; set; } public Action<MessageType, string, string> FailedThresholdCallback { get; set; }
/// <summary> /// <summary>
/// The number of message retries, the retry will stop when the threshold is reached. /// The number of message retries, the retry will stop when the threshold is reached.
......
...@@ -178,6 +178,8 @@ namespace DotNetCore.CAP ...@@ -178,6 +178,8 @@ namespace DotNetCore.CAP
private (Guid, string) TracingBefore(string topic, string values) private (Guid, string) TracingBefore(string topic, string values)
{ {
_logger.LogDebug("CAP received topic message:" + topic);
Guid operationId = Guid.NewGuid(); Guid operationId = Guid.NewGuid();
var eventData = new BrokerConsumeEventData( var eventData = new BrokerConsumeEventData(
......
...@@ -65,19 +65,6 @@ namespace DotNetCore.CAP.Processor ...@@ -65,19 +65,6 @@ namespace DotNetCore.CAP.Processor
continue; continue;
} }
if (!hasException)
{
try
{
_options.FailedCallback?.Invoke(MessageType.Publish, message.Name, message.Content);
}
catch (Exception ex)
{
hasException = true;
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
}
}
using (var transaction = connection.CreateTransaction()) using (var transaction = connection.CreateTransaction())
{ {
var result = await _publishExecutor.PublishAsync(message.Name, message.Content); var result = await _publishExecutor.PublishAsync(message.Name, message.Content);
...@@ -101,6 +88,21 @@ namespace DotNetCore.CAP.Processor ...@@ -101,6 +88,21 @@ namespace DotNetCore.CAP.Processor
{ {
_logger.LogError($"The message still sent failed after {_options.FailedRetryCount} retries. We will stop retrying the message. " + _logger.LogError($"The message still sent failed after {_options.FailedRetryCount} retries. We will stop retrying the message. " +
"MessageId:" + message.Id); "MessageId:" + message.Id);
if (message.Retries == _options.FailedRetryCount)
{
if (!hasException)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Publish, message.Name, message.Content);
}
catch (Exception ex)
{
hasException = true;
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
}
}
}
} }
} }
await transaction.CommitAsync(); await transaction.CommitAsync();
...@@ -124,19 +126,6 @@ namespace DotNetCore.CAP.Processor ...@@ -124,19 +126,6 @@ namespace DotNetCore.CAP.Processor
continue; continue;
} }
if (!hasException)
{
try
{
_options.FailedCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);
}
catch (Exception ex)
{
hasException = true;
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
}
}
using (var transaction = connection.CreateTransaction()) using (var transaction = connection.CreateTransaction())
{ {
var result = await _subscriberExecutor.ExecuteAsync(message); var result = await _subscriberExecutor.ExecuteAsync(message);
...@@ -160,6 +149,22 @@ namespace DotNetCore.CAP.Processor ...@@ -160,6 +149,22 @@ namespace DotNetCore.CAP.Processor
{ {
_logger.LogError($"[Subscriber]The message still executed failed after {_options.FailedRetryCount} retries. " + _logger.LogError($"[Subscriber]The message still executed failed after {_options.FailedRetryCount} retries. " +
"We will stop retrying to execute the message. message id:" + message.Id); "We will stop retrying to execute the message. message id:" + message.Id);
if (message.Retries == _options.FailedRetryCount)
{
if (!hasException)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);
}
catch (Exception ex)
{
hasException = true;
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
}
}
}
} }
} }
await transaction.CommitAsync(); await transaction.CommitAsync();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment