Commit 47d579cd authored by Savorboard's avatar Savorboard

merge from develop.

parents d46dab2d 6f917898
......@@ -32,3 +32,4 @@ obj/
bin/
/.idea/.idea.CAP
/.idea/.idea.CAP
/.idea

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26430.14
VisualStudioVersion = 15.0.26430.15
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{9B2AE124-6636-4DE9-83A3-70360DABD0C4}"
EndProject
......@@ -33,8 +33,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Shared", "Shared", "{9E5A7F
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP", "src\DotNetCore.CAP\DotNetCore.CAP.csproj", "{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.EntityFrameworkCore", "src\DotNetCore.CAP.EntityFrameworkCore\DotNetCore.CAP.EntityFrameworkCore.csproj", "{96111249-C4C3-4DC9-A887-32D583723AB1}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{3A6B6931-A123-477A-9469-8B468B5385AF}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka", "samples\Sample.Kafka\Sample.Kafka.csproj", "{2F095ED9-5BC9-4512-9013-A47685FB2508}"
......@@ -55,10 +53,12 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{10C0818D
build\version.props = build\version.props
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.EntityFrameworkCore.Test", "test\DotNetCore.CAP.EntityFrameworkCore.Test\DotNetCore.CAP.EntityFrameworkCore.Test.csproj", "{69370370-9873-4D6A-965D-D1E16694047D}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Test", "test\DotNetCore.CAP.Test\DotNetCore.CAP.Test.csproj", "{F608B509-A99B-4AC7-8227-42051DD4A578}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.SqlServer", "src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj", "{3B577468-6792-4EF1-9237-15180B176A24}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.SqlServer.Test", "test\DotNetCore.CAP.SqlServer.Test\DotNetCore.CAP.SqlServer.Test.csproj", "{DA00FA38-C4B9-4F55-8756-D480FBC1084F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
......@@ -69,10 +69,6 @@ Global
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}.Release|Any CPU.Build.0 = Release|Any CPU
{96111249-C4C3-4DC9-A887-32D583723AB1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{96111249-C4C3-4DC9-A887-32D583723AB1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{96111249-C4C3-4DC9-A887-32D583723AB1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{96111249-C4C3-4DC9-A887-32D583723AB1}.Release|Any CPU.Build.0 = Release|Any CPU
{2F095ED9-5BC9-4512-9013-A47685FB2508}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2F095ED9-5BC9-4512-9013-A47685FB2508}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2F095ED9-5BC9-4512-9013-A47685FB2508}.Release|Any CPU.ActiveCfg = Release|Any CPU
......@@ -85,14 +81,17 @@ Global
{9961B80E-0718-4280-B2A0-271B003DE26B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9961B80E-0718-4280-B2A0-271B003DE26B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9961B80E-0718-4280-B2A0-271B003DE26B}.Release|Any CPU.Build.0 = Release|Any CPU
{69370370-9873-4D6A-965D-D1E16694047D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{69370370-9873-4D6A-965D-D1E16694047D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{69370370-9873-4D6A-965D-D1E16694047D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{69370370-9873-4D6A-965D-D1E16694047D}.Release|Any CPU.Build.0 = Release|Any CPU
{F608B509-A99B-4AC7-8227-42051DD4A578}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F608B509-A99B-4AC7-8227-42051DD4A578}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F608B509-A99B-4AC7-8227-42051DD4A578}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F608B509-A99B-4AC7-8227-42051DD4A578}.Release|Any CPU.Build.0 = Release|Any CPU
{3B577468-6792-4EF1-9237-15180B176A24}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3B577468-6792-4EF1-9237-15180B176A24}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3B577468-6792-4EF1-9237-15180B176A24}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3B577468-6792-4EF1-9237-15180B176A24}.Release|Any CPU.Build.0 = Release|Any CPU
{DA00FA38-C4B9-4F55-8756-D480FBC1084F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DA00FA38-C4B9-4F55-8756-D480FBC1084F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DA00FA38-C4B9-4F55-8756-D480FBC1084F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DA00FA38-C4B9-4F55-8756-D480FBC1084F}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......@@ -100,11 +99,11 @@ Global
GlobalSection(NestedProjects) = preSolution
{9E5A7F49-8E31-4A71-90CC-1DA9AEDA99EE} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{96111249-C4C3-4DC9-A887-32D583723AB1} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{2F095ED9-5BC9-4512-9013-A47685FB2508} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{C42CDE33-0878-4BA0-96F2-4CB7C8FDEAAD} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{9961B80E-0718-4280-B2A0-271B003DE26B} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{69370370-9873-4D6A-965D-D1E16694047D} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{F608B509-A99B-4AC7-8227-42051DD4A578} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{3B577468-6792-4EF1-9237-15180B176A24} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{DA00FA38-C4B9-4F55-8756-D480FBC1084F} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
EndGlobalSection
EndGlobal
......@@ -6,6 +6,7 @@
[![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/master.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP)
[![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap)
[![NuGet](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/)
[![Member Project Of .NET China Foundation](https://github.com/dotnetcore/Home/raw/master/icons/member-project-of-netchina.png)](https://github.com/dotnetcore)
[![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt)
CAP 是一个在分布式系统(SOA、MicroService)中实现最终一致性的库,它具有轻量级、易使用、高性能等特点。
......@@ -30,6 +31,10 @@ CAP 具有消息持久化的功能,当你的服务进行重启或者宕机时
你可以运行以下下命令在你的项目中安装 CAP。
```
PM> Install-Package DotNetCore.CAP -Pre
```
如果你的消息队列使用的是 Kafka 的话,你可以:
```
......@@ -42,10 +47,10 @@ PM> Install-Package DotNetCore.CAP.Kafka -Pre
PM> Install-Package DotNetCore.CAP.RabbitMQ -Pre
```
CAP 默认提供了 Entity Framwork 作为数据库存储
CAP 默认提供了 Sql Server 的扩展作为数据库存储(MySql的正在开发中)
```
PM> Install-Package DotNetCore.CAP.EntityFrameworkCore -Pre
PM> Install-Package DotNetCore.CAP.SqlServer -Pre
```
### Configuration
......@@ -57,11 +62,23 @@ public void ConfigureServices(IServiceCollection services)
{
......
services.AddDbContext<AppDbContext>();
services.AddDbContext<AppDbContext>();
services.AddCap()
.AddEntityFrameworkStores<AppDbContext>()
.AddKafka(x => x.Servers = "localhost:9092");
services.AddCap(x =>
{
// 如果你的 SqlServer 使用的 EF 进行数据操作,你需要添加如下配置:
// 注意: 你不需要再次配置 x.UseSqlServer(""")
x.UseEntityFramework<AppDbContext>();
// 如果你使用的Dapper,你需要添加如下配置:
x.UseSqlServer("数据库连接字符串");
// 如果你使用的 RabbitMQ 作为MQ,你需要添加如下配置:
x.UseRabbitMQ("localhost");
//如果你使用的 Kafka 作为MQ,你需要添加如下配置:
x.UseKafka("localhost");
});
}
public void Configure(IApplicationBuilder app)
......@@ -96,6 +113,18 @@ public class PublishController : Controller
return Ok();
}
[Route("~/checkAccountWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction([FromServices]AppDbContext dbContext)
{
using (var trans = dbContext.Database.BeginTransaction())
{
await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 });
trans.Commit();
}
return Ok();
}
}
```
......
......@@ -3,9 +3,9 @@ os: Visual Studio 2015
environment:
BUILDING_ON_PLATFORM: win
BuildEnvironment: appveyor
Cap_SqlServer_ConnectionStringTemplate: Server=.\SQL2012SP1;Database={0};User ID=sa;Password=Password12!
Cap_SqlServer_ConnectionStringTemplate: Server=(local)\SQL2014;Database={0};User ID=sa;Password=Password12!
services:
- mssql2012sp1
- mssql2014
build_script:
- ps: ./ConfigureMSDTC.ps1
- ps: ./build.ps1
......@@ -17,6 +17,6 @@ deploy:
on:
appveyor_repo_tag: true
api_key:
secure: P4da9c6a6-00e1-47d0-a821-b62380362dc9
secure: U62rpGTEqztrUO4ncscm4XSaAoCSmWwT/rOWO/2JJS44psJvl0QpjRL0o0ughMoY
skip_symbols: true
artifact: /artifacts\/packages\/.+\.nupkg/
......@@ -2,7 +2,7 @@
<PropertyGroup>
<VersionMajor>0</VersionMajor>
<VersionMinor>1</VersionMinor>
<VersionPatch>0</VersionPatch>
<VersionPatch>1</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup>
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using JetBrains.Annotations;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
namespace Sample.Kafka
{
public class AppDbContext : DbContext
{
public DbSet<CapSentMessage> SentMessages { get; set; }
public DbSet<CapReceivedMessage> ReceivedMessages { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<CapSentMessage>().Property(x => x.StatusName).HasMaxLength(50);
modelBuilder.Entity<CapReceivedMessage>().Property(x => x.StatusName).HasMaxLength(50);
base.OnModelCreating(modelBuilder);
//optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
}
}
}
using System;
using System.Threading.Tasks;
using DotNetCore.CAP;
using DotNetCore.CAP.Kafka;
using DotNetCore.CAP.RabbitMQ;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Mvc;
......@@ -11,10 +11,12 @@ namespace Sample.Kafka.Controllers
public class ValuesController : Controller, ICapSubscribe
{
private readonly ICapPublisher _producer;
private readonly AppDbContext _dbContext ;
public ValuesController(ICapPublisher producer)
public ValuesController(ICapPublisher producer, AppDbContext dbContext)
{
_producer = producer;
_dbContext = dbContext;
}
[Route("/")]
......@@ -27,15 +29,19 @@ namespace Sample.Kafka.Controllers
[CapSubscribe("zzwl.topic.finace.callBack", Group = "test")]
public void KafkaTest(Person person)
{
Console.WriteLine(person.Name);
Console.WriteLine(person.Age);
Console.WriteLine(DateTime.Now);
}
[Route("~/send")]
public async Task<IActionResult> SendTopic()
{
await _producer.PublishAsync("zzwl.topic.finace.callBack", new Person { Name = "Test", Age = 11 });
using (var trans = _dbContext.Database.BeginTransaction())
{
await _producer.PublishAsync("zzwl.topic.finace.callBack","");
trans.Commit();
}
return Ok();
}
......
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using Sample.Kafka;
namespace Sample.Kafka.Migrations
{
[DbContext(typeof(AppDbContext))]
[Migration("20170710102614_InitilizeDB")]
partial class InitilizeDB
{
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
modelBuilder
.HasAnnotation("ProductVersion", "1.1.2")
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
modelBuilder.Entity("DotNetCore.CAP.Infrastructure.CapReceivedMessage", b =>
{
b.Property<string>("Id")
.ValueGeneratedOnAdd();
b.Property<DateTime>("Added");
b.Property<string>("Content");
b.Property<string>("Group");
b.Property<string>("KeyName");
b.Property<DateTime>("LastRun");
b.Property<int>("Retries");
b.Property<string>("StatusName")
.HasMaxLength(50);
b.HasKey("Id");
b.ToTable("ReceivedMessages");
});
modelBuilder.Entity("DotNetCore.CAP.Infrastructure.CapSentMessage", b =>
{
b.Property<string>("Id")
.ValueGeneratedOnAdd();
b.Property<DateTime>("Added");
b.Property<string>("Content");
b.Property<string>("KeyName");
b.Property<DateTime>("LastRun");
b.Property<int>("Retries");
b.Property<string>("StatusName")
.HasMaxLength(50);
b.HasKey("Id");
b.ToTable("SentMessages");
});
}
}
}
using System;
using System.Collections.Generic;
using Microsoft.EntityFrameworkCore.Migrations;
namespace Sample.Kafka.Migrations
{
public partial class InitilizeDB : Migration
{
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "ReceivedMessages",
columns: table => new
{
Id = table.Column<string>(nullable: false),
Added = table.Column<DateTime>(nullable: false),
Content = table.Column<string>(nullable: true),
Group = table.Column<string>(nullable: true),
KeyName = table.Column<string>(nullable: true),
LastRun = table.Column<DateTime>(nullable: false),
Retries = table.Column<int>(nullable: false),
StatusName = table.Column<string>(maxLength: 50, nullable: true)
},
constraints: table =>
{
table.PrimaryKey("PK_ReceivedMessages", x => x.Id);
});
migrationBuilder.CreateTable(
name: "SentMessages",
columns: table => new
{
Id = table.Column<string>(nullable: false),
Added = table.Column<DateTime>(nullable: false),
Content = table.Column<string>(nullable: true),
KeyName = table.Column<string>(nullable: true),
LastRun = table.Column<DateTime>(nullable: false),
Retries = table.Column<int>(nullable: false),
StatusName = table.Column<string>(maxLength: 50, nullable: true)
},
constraints: table =>
{
table.PrimaryKey("PK_SentMessages", x => x.Id);
});
}
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "ReceivedMessages");
migrationBuilder.DropTable(
name: "SentMessages");
}
}
}
......@@ -24,9 +24,8 @@
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="1.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.EntityFrameworkCore\DotNetCore.CAP.EntityFrameworkCore.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
......
......@@ -25,16 +25,13 @@ namespace Sample.Kafka
{
services.AddDbContext<AppDbContext>();
services.AddCap()
.AddEntityFrameworkStores<AppDbContext>()
.AddRabbitMQ(x =>
{
x.HostName = "192.168.2.206";
x.UserName = "admin";
x.Password = "123123";
});
//.AddKafka(x => x.Servers = "");
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
//x.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
x.UseRabbitMQ(o => { o.HostName = "192.168.2.206"; o.UserName = "admin"; o.Password = "123123"; });
});
// Add framework services.
services.AddMvc();
}
......
using DotNetCore.CAP;
using DotNetCore.CAP.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
namespace Microsoft.Extensions.DependencyInjection
{
/// <summary>
/// Contains extension methods to <see cref="CapBuilder"/> for adding entity framework stores.
/// </summary>
public static class CapEntityFrameworkBuilderExtensions
{
/// <summary>
/// Adds an Entity Framework implementation of message stores.
/// </summary>
/// <typeparam name="TContext">The Entity Framework database context to use.</typeparam>
/// <returns>The <see cref="CapBuilder"/> instance this method extends.</returns>
public static CapBuilder AddEntityFrameworkStores<TContext>(this CapBuilder builder)
where TContext : DbContext
{
builder.Services.AddScoped<ICapMessageStore, CapMessageStore<TContext>>();
return builder;
}
}
}
\ No newline at end of file
using DotNetCore.CAP.Infrastructure;
using Microsoft.EntityFrameworkCore;
namespace DotNetCore.CAP.EntityFrameworkCore
{
/// <summary>
/// Base class for the Entity Framework database context used for CAP.
/// </summary>
public class CapDbContext : DbContext
{
/// <summary>
/// Initializes a new instance of the <see cref="CapDbContext"/>.
/// </summary>
public CapDbContext() { }
/// <summary>
/// Initializes a new instance of the <see cref="CapDbContext"/>.
/// </summary>
/// <param name="options">The options to be used by a <see cref="DbContext"/>.</param>
public CapDbContext(DbContextOptions options) : base(options) { }
/// <summary>
/// Gets or sets the <see cref="CapSentMessage"/> of Messages.
/// </summary>
public DbSet<CapSentMessage> CapSentMessages { get; set; }
/// <summary>
/// Gets or sets the <see cref="CapReceivedMessages"/> of Messages.
/// </summary>
public DbSet<CapReceivedMessage> CapReceivedMessages { get; set; }
/// <summary>
/// Configures the schema for the identity framework.
/// </summary>
/// <param name="modelBuilder">
/// The builder being used to construct the model for this context.
/// </param>
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<CapSentMessage>(b =>
{
b.HasKey(m => m.Id);
b.Property(p => p.StatusName).HasMaxLength(50);
});
modelBuilder.Entity<CapReceivedMessage>(b =>
{
b.Property(p => p.StatusName).HasMaxLength(50);
});
}
}
}
\ No newline at end of file
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using Microsoft.EntityFrameworkCore;
namespace DotNetCore.CAP.EntityFrameworkCore
{
/// <summary>
/// Represents a new instance of a persistence store for the specified message types.
/// </summary>
/// <typeparam name="TContext">The type of the data context class used to access the store.</typeparam>
public class CapMessageStore<TContext> : ICapMessageStore where TContext : DbContext
{
/// <summary>
/// Constructs a new instance of <see cref="TContext"/>.
/// </summary>
/// <param name="context">The <see cref="DbContext"/>.</param>
public CapMessageStore(TContext context)
{
Context = context ?? throw new ArgumentNullException(nameof(context));
}
public TContext Context { get; private set; }
private DbSet<CapSentMessage> SentMessages => Context.Set<CapSentMessage>();
private DbSet<CapReceivedMessage> ReceivedMessages => Context.Set<CapReceivedMessage>();
/// <summary>
/// Creates the specified <paramref name="message"/> in the cap message store.
/// </summary>
/// <param name="message">The message to create.</param>
public async Task<OperateResult> StoreSentMessageAsync(CapSentMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
Context.Add(message);
await Context.SaveChangesAsync();
return OperateResult.Success;
}
public async Task<OperateResult> ChangeSentMessageStateAsync(CapSentMessage message, string status,
bool autoSaveChanges = true)
{
Context.Attach(message);
message.LastRun = DateTime.Now;
message.StatusName = status;
try
{
if (autoSaveChanges)
{
await Context.SaveChangesAsync();
}
}
catch (DbUpdateConcurrencyException ex)
{
return OperateResult.Failed(
new OperateError()
{
Code = "DbUpdateConcurrencyException",
Description = ex.Message
});
}
return OperateResult.Success;
}
/// <summary>
/// First Enqueued Message.
/// </summary>
public async Task<CapSentMessage> GetNextSentMessageToBeEnqueuedAsync()
{
return await SentMessages.FirstOrDefaultAsync(x => x.StatusName == StatusName.Enqueued);
}
/// <summary>
/// Updates a message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to update in the store.</param>
public async Task<OperateResult> UpdateSentMessageAsync(CapSentMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
Context.Attach(message);
message.LastRun = DateTime.Now;
Context.Update(message);
try
{
await Context.SaveChangesAsync();
return OperateResult.Success;
}
catch (DbUpdateConcurrencyException ex)
{
return OperateResult.Failed(new OperateError()
{
Code = "DbUpdateConcurrencyException",
Description = ex.Message
});
}
}
/// <summary>
/// Deletes the specified <paramref name="message"/> from the consistency message store.
/// </summary>
/// <param name="message">The message to delete.</param>
public async Task<OperateResult> RemoveSentMessageAsync(CapSentMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
Context.Remove(message);
try
{
await Context.SaveChangesAsync();
return OperateResult.Success;
}
catch (DbUpdateConcurrencyException ex)
{
return OperateResult.Failed(new OperateError()
{
Code = "DbUpdateConcurrencyException",
Description = ex.Message
});
}
}
/// <summary>
/// Creates the specified <paramref name="message"/> in the consistency message store.
/// </summary>
/// <param name="message">The message to create.</param>
public async Task<OperateResult> StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
Context.Add(message);
await Context.SaveChangesAsync();
return OperateResult.Success;
}
public async Task<OperateResult> ChangeReceivedMessageStateAsync(CapReceivedMessage message, string status,
bool autoSaveChanges = true)
{
Context.Attach(message);
message.LastRun = DateTime.Now;
message.StatusName = status;
try
{
if (autoSaveChanges)
{
await Context.SaveChangesAsync();
}
}
catch (DbUpdateConcurrencyException ex)
{
return OperateResult.Failed(new OperateError()
{
Code = "DbUpdateConcurrencyException",
Description = ex.Message
});
}
return OperateResult.Success;
}
public async Task<CapReceivedMessage> GetNextReceivedMessageToBeExcuted()
{
return await ReceivedMessages.FirstOrDefaultAsync(x => x.StatusName == StatusName.Enqueued);
}
/// <summary>
/// Updates the specified <paramref name="message"/> in the message store.
/// </summary>
/// <param name="message">The message to update.</param>
public async Task<OperateResult> UpdateReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
Context.Attach(message);
message.LastRun = DateTime.Now;
Context.Update(message);
try
{
await Context.SaveChangesAsync();
return OperateResult.Success;
}
catch (DbUpdateConcurrencyException ex)
{
return OperateResult.Failed(new OperateError()
{
Code = "DbUpdateConcurrencyException",
Description = ex.Message
});
}
}
}
}
\ No newline at end of file
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Job;
using DotNetCore.CAP.Kafka;
namespace Microsoft.Extensions.DependencyInjection
{
/// <summary>
/// Contains extension methods to <see cref="CapBuilder"/> for adding kafka service.
/// </summary>
public static class CapBuilderExtensions
{
/// <summary>
/// Adds an Kafka implementation of CAP messages queue.
/// </summary>
/// <param name="builder">The <see cref="CapBuilder"/> instance this method extends</param>
/// <param name="setupAction">An action to configure the <see cref="KafkaOptions"/>.</param>
/// <returns>An <see cref="CapBuilder"/> for creating and configuring the CAP system.</returns>
public static CapBuilder AddKafka(this CapBuilder builder, Action<KafkaOptions> setupAction)
{
if (setupAction == null) throw new ArgumentNullException(nameof(setupAction));
builder.Services.Configure(setupAction);
builder.Services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();
builder.Services.AddTransient<IJobProcessor, KafkaJobProcessor>();
return builder;
}
}
}
\ No newline at end of file
using System;
using DotNetCore.CAP.Kafka;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class KafkaCapOptionsExtension : ICapOptionsExtension
{
private readonly Action<KafkaOptions> _configure;
public KafkaCapOptionsExtension(Action<KafkaOptions> configure)
{
_configure = configure;
}
public void AddServices(IServiceCollection services)
{
services.Configure(_configure);
var kafkaOptions = new KafkaOptions();
_configure(kafkaOptions);
services.AddSingleton(kafkaOptions);
services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();
services.AddTransient<IQueueExecutor, PublishQueueExecutor>();
}
}
}
\ No newline at end of file
......@@ -2,7 +2,8 @@
using System.Collections.Generic;
using System.Linq;
namespace DotNetCore.CAP.Kafka
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
/// <summary>
/// Provides programmatic configuration for the CAP kafka project.
......@@ -32,9 +33,9 @@ namespace DotNetCore.CAP.Kafka
internal IEnumerable<KeyValuePair<string, object>> AsRdkafkaConfig()
{
if (MainConfig.ContainsKey("bootstrap.servers"))
if (MainConfig.ContainsKey("bootstrap.servers"))
return MainConfig.AsEnumerable();
if (string.IsNullOrEmpty(Servers))
{
throw new ArgumentNullException(nameof(Servers));
......
using System;
using DotNetCore.CAP;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions
{
public static CapOptions UseKafka(this CapOptions options, string bootstrapServers)
{
return options.UseRabbitMQ(opt =>
{
opt.Servers = bootstrapServers;
});
}
public static CapOptions UseRabbitMQ(this CapOptions options, Action<KafkaOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
options.RegisterExtension(new KafkaCapOptionsExtension(configure));
return options;
}
}
}
\ No newline at end of file
using DotNetCore.CAP.Abstractions;
namespace DotNetCore.CAP.Kafka
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class CapSubscribeAttribute : TopicAttribute
{
public CapSubscribeAttribute(string topicName)
: this(topicName, 0)
public CapSubscribeAttribute(string name)
: this(name, 0)
{
}
/// <summary>
/// Not support
/// </summary>
public CapSubscribeAttribute(string topicName, int partition)
: this(topicName, partition, 0)
public CapSubscribeAttribute(string name, int partition)
: this(name, partition, 0)
{
}
/// <summary>
/// Not support
/// </summary>
public CapSubscribeAttribute(string topicName, int partition, long offset)
: base(topicName)
public CapSubscribeAttribute(string name, int partition, long offset)
: base(name)
{
Offset = offset;
Partition = partition;
......
using System;
using System.Text;
using System.Threading;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP.Kafka
{
......@@ -38,10 +38,11 @@ namespace DotNetCore.CAP.Kafka
_consumerClient.Subscribe(topicName);
}
public void Listening(TimeSpan timeout)
public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
_consumerClient.Poll(timeout);
}
}
......@@ -73,13 +74,12 @@ namespace DotNetCore.CAP.Kafka
var message = new MessageContext
{
Group = _groupId,
KeyName = e.Topic,
Name = e.Topic,
Content = e.Value
};
MessageReceieved?.Invoke(sender, message);
}
#endregion private methods
}
}
\ No newline at end of file
using System;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Kafka
{
internal static class LoggerExtensions
{
private static readonly Action<ILogger, Exception> _collectingExpiredEntities;
private static readonly Action<ILogger, Exception> _installing;
private static readonly Action<ILogger, Exception> _installingError;
private static readonly Action<ILogger, Exception> _installingSuccess;
private static readonly Action<ILogger, Exception> _jobFailed;
private static readonly Action<ILogger, Exception> _jobFailedWillRetry;
private static readonly Action<ILogger, double, Exception> _jobExecuted;
private static readonly Action<ILogger, int, Exception> _jobRetrying;
private static readonly Action<ILogger, int, Exception> _jobCouldNotBeLoaded;
private static readonly Action<ILogger, string, Exception> _exceptionOccuredWhileExecutingJob;
static LoggerExtensions()
{
_collectingExpiredEntities = LoggerMessage.Define(
LogLevel.Debug,
1,
"Collecting expired entities.");
_installing = LoggerMessage.Define(
LogLevel.Debug,
1,
"Installing Jobs SQL objects...");
_installingError = LoggerMessage.Define(
LogLevel.Warning,
2,
"Exception occurred during automatic migration. Retrying...");
_installingSuccess = LoggerMessage.Define(
LogLevel.Debug,
3,
"Jobs SQL objects installed.");
_jobFailed = LoggerMessage.Define(
LogLevel.Warning,
1,
"Job failed to execute.");
_jobFailedWillRetry = LoggerMessage.Define(
LogLevel.Warning,
2,
"Job failed to execute. Will retry.");
_jobRetrying = LoggerMessage.Define<int>(
LogLevel.Debug,
3,
"Retrying a job: {Retries}...");
_jobExecuted = LoggerMessage.Define<double>(
LogLevel.Debug,
4,
"Job executed. Took: {Seconds} secs.");
_jobCouldNotBeLoaded = LoggerMessage.Define<int>(
LogLevel.Warning,
5,
"Could not load a job: '{JobId}'.");
_exceptionOccuredWhileExecutingJob = LoggerMessage.Define<string>(
LogLevel.Error,
6,
"An exception occured while trying to execute a job: '{JobId}'. " +
"Requeuing for another retry.");
}
public static void CollectingExpiredEntities(this ILogger logger)
{
_collectingExpiredEntities(logger, null);
}
public static void Installing(this ILogger logger)
{
_installing(logger, null);
}
public static void InstallingError(this ILogger logger, Exception ex)
{
_installingError(logger, ex);
}
public static void InstallingSuccess(this ILogger logger)
{
_installingSuccess(logger, null);
}
public static void JobFailed(this ILogger logger, Exception ex)
{
_jobFailed(logger, ex);
}
public static void JobFailedWillRetry(this ILogger logger, Exception ex)
{
_jobFailedWillRetry(logger, ex);
}
public static void JobRetrying(this ILogger logger, int retries)
{
_jobRetrying(logger, retries, null);
}
public static void JobExecuted(this ILogger logger, double seconds)
{
_jobExecuted(logger, seconds, null);
}
public static void JobCouldNotBeLoaded(this ILogger logger, int jobId, Exception ex)
{
_jobCouldNotBeLoaded(logger, jobId, ex);
}
public static void ExceptionOccuredWhileExecutingJob(this ILogger logger, string jobId, Exception ex)
{
_exceptionOccuredWhileExecutingJob(logger, jobId, ex);
}
}
}
\ No newline at end of file
using System;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Kafka
{
public class PublishQueueExecutor : BasePublishQueueExecutor
{
private readonly ILogger _logger;
private readonly KafkaOptions _kafkaOptions;
public PublishQueueExecutor(IStateChanger stateChanger,
IOptions<KafkaOptions> options,
ILogger<PublishQueueExecutor> logger)
: base(stateChanger, logger)
{
_logger = logger;
_kafkaOptions = options.Value;
}
public override Task<OperateResult> PublishAsync(string keyName, string content)
{
try
{
var config = _kafkaOptions.AsRdkafkaConfig();
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{
producer.ProduceAsync(keyName, null, content);
producer.Flush();
}
_logger.LogDebug($"kafka topic message [{keyName}] has been published.");
return Task.FromResult(OperateResult.Success);
}
catch (Exception ex)
{
_logger.LogError($"kafka topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}");
return Task.FromResult(OperateResult.Failed(ex,
new OperateError()
{
Code = ex.HResult.ToString(),
Description = ex.Message
}));
}
}
}
}
\ No newline at end of file
using System;
using DotNetCore.CAP;
using DotNetCore.CAP.Job;
using DotNetCore.CAP.RabbitMQ;
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapBuilderExtensions
{
public static CapBuilder AddRabbitMQ(this CapBuilder builder, Action<RabbitMQOptions> setupOptions)
{
if (setupOptions == null) throw new ArgumentNullException(nameof(setupOptions));
builder.Services.Configure(setupOptions);
builder.Services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>();
builder.Services.AddTransient<IJobProcessor, RabbitJobProcessor>();
return builder;
}
}
}
\ No newline at end of file
using System;
using DotNetCore.CAP;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions
{
public static CapOptions UseRabbitMQ(this CapOptions options, string hostName)
{
return options.UseRabbitMQ(opt =>
{
opt.HostName = hostName;
});
}
public static CapOptions UseRabbitMQ(this CapOptions options, Action<RabbitMQOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
options.RegisterExtension(new RabbitMQCapOptionsExtension(configure));
return options;
}
}
}
\ No newline at end of file
namespace DotNetCore.CAP.RabbitMQ
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class RabbitMQOptions
{
......@@ -34,7 +35,7 @@
public string HostName { get; set; } = "localhost";
/// <summary> The topic exchange type. </summary>
internal string EXCHANGE_TYPE = "topic";
internal const string ExchangeType = "topic";
/// <summary>
/// Password to use when authenticating to the server.
......@@ -72,7 +73,7 @@
public int SocketWriteTimeout { get; set; } = DefaultConnectionTimeout;
/// <summary>
/// The port to connect on.
/// The port to connect on.
/// </summary>
public int Port { get; set; } = -1;
}
......
using System;
using DotNetCore.CAP.RabbitMQ;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class RabbitMQCapOptionsExtension : ICapOptionsExtension
{
private readonly Action<RabbitMQOptions> _configure;
public RabbitMQCapOptionsExtension(Action<RabbitMQOptions> configure)
{
_configure = configure;
}
public void AddServices(IServiceCollection services)
{
services.Configure(_configure);
var rabbitMQOptions = new RabbitMQOptions();
_configure(rabbitMQOptions);
services.AddSingleton(rabbitMQOptions);
services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>();
services.AddTransient<IQueueExecutor, PublishQueueExecutor>();
}
}
}
\ No newline at end of file
using DotNetCore.CAP.Abstractions;
namespace DotNetCore.CAP.RabbitMQ
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class CapSubscribeAttribute : TopicAttribute
{
public CapSubscribeAttribute(string routingKey) : base(routingKey)
public CapSubscribeAttribute(string name) : base(name)
{
}
}
}
\ No newline at end of file
using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Job;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
{
public class RabbitJobProcessor : IJobProcessor
{
private readonly RabbitMQOptions _rabbitMqOptions;
private readonly CancellationTokenSource _cts;
private readonly IServiceProvider _provider;
private readonly ILogger _logger;
private readonly TimeSpan _pollingDelay;
public RabbitJobProcessor(
IOptions<CapOptions> capOptions,
IOptions<RabbitMQOptions> rabbitMQOptions,
ILogger<RabbitJobProcessor> logger,
IServiceProvider provider)
{
_logger = logger;
_rabbitMqOptions = rabbitMQOptions.Value;
_provider = provider;
_cts = new CancellationTokenSource();
var capOptions1 = capOptions.Value;
_pollingDelay = TimeSpan.FromSeconds(capOptions1.PollingDelay);
}
public bool Waiting { get; private set; }
public Task ProcessAsync(ProcessingContext context)
{
if (context == null) throw new ArgumentNullException(nameof(context));
context.ThrowIfStopping();
return ProcessCoreAsync(context);
}
public async Task ProcessCoreAsync(ProcessingContext context)
{
try
{
var worked = await Step(context);
context.ThrowIfStopping();
Waiting = true;
if (!worked)
{
var token = GetTokenToWaitOn(context);
}
await WaitHandleEx.WaitAnyAsync(WaitHandleEx.PulseEvent,
context.CancellationToken.WaitHandle, _pollingDelay);
}
finally
{
Waiting = false;
}
}
protected virtual CancellationToken GetTokenToWaitOn(ProcessingContext context)
{
return context.CancellationToken;
}
private async Task<bool> Step(ProcessingContext context)
{
using (var scopedContext = context.CreateScope())
{
var provider = scopedContext.Provider;
var messageStore = provider.GetRequiredService<ICapMessageStore>();
var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync();
try
{
if (message != null)
{
var sp = Stopwatch.StartNew();
message.StatusName = StatusName.Processing;
await messageStore.UpdateSentMessageAsync(message);
ExecuteJob(message.KeyName, message.Content);
sp.Stop();
message.StatusName = StatusName.Succeeded;
await messageStore.UpdateSentMessageAsync(message);
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
}
}
catch (Exception ex)
{
_logger.ExceptionOccuredWhileExecutingJob(message?.KeyName, ex);
return false;
}
}
return true;
}
private void ExecuteJob(string topic, string content)
{
var factory = new ConnectionFactory()
{
HostName = _rabbitMqOptions.HostName,
UserName = _rabbitMqOptions.UserName,
Port = _rabbitMqOptions.Port,
Password = _rabbitMqOptions.Password,
VirtualHost = _rabbitMqOptions.VirtualHost,
RequestedConnectionTimeout = _rabbitMqOptions.RequestedConnectionTimeout,
SocketReadTimeout = _rabbitMqOptions.SocketReadTimeout,
SocketWriteTimeout = _rabbitMqOptions.SocketWriteTimeout
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
var body = Encoding.UTF8.GetBytes(content);
channel.ExchangeDeclare(_rabbitMqOptions.TopicExchangeName, _rabbitMqOptions.EXCHANGE_TYPE);
channel.BasicPublish(exchange: _rabbitMqOptions.TopicExchangeName,
routingKey: topic,
basicProperties: null,
body: body);
}
}
}
}
\ No newline at end of file
using System;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.RabbitMQ
{
internal static class LoggerExtensions
{
private static Action<ILogger, Exception> _collectingExpiredEntities;
private static Action<ILogger, Exception> _installing;
private static Action<ILogger, Exception> _installingError;
private static Action<ILogger, Exception> _installingSuccess;
private static Action<ILogger, Exception> _jobFailed;
private static Action<ILogger, Exception> _jobFailedWillRetry;
private static Action<ILogger, double, Exception> _jobExecuted;
private static Action<ILogger, int, Exception> _jobRetrying;
private static Action<ILogger, int, Exception> _jobCouldNotBeLoaded;
private static Action<ILogger, string, Exception> _exceptionOccuredWhileExecutingJob;
static LoggerExtensions()
{
_collectingExpiredEntities = LoggerMessage.Define(
LogLevel.Debug,
1,
"Collecting expired entities.");
_installing = LoggerMessage.Define(
LogLevel.Debug,
1,
"Installing Jobs SQL objects...");
_installingError = LoggerMessage.Define(
LogLevel.Warning,
2,
"Exception occurred during automatic migration. Retrying...");
_installingSuccess = LoggerMessage.Define(
LogLevel.Debug,
3,
"Jobs SQL objects installed.");
_jobFailed = LoggerMessage.Define(
LogLevel.Warning,
1,
"Job failed to execute.");
_jobFailedWillRetry = LoggerMessage.Define(
LogLevel.Warning,
2,
"Job failed to execute. Will retry.");
_jobRetrying = LoggerMessage.Define<int>(
LogLevel.Debug,
3,
"Retrying a job: {Retries}...");
_jobExecuted = LoggerMessage.Define<double>(
LogLevel.Debug,
4,
"Job executed. Took: {Seconds} secs.");
_jobCouldNotBeLoaded = LoggerMessage.Define<int>(
LogLevel.Warning,
5,
"Could not load a job: '{JobId}'.");
_exceptionOccuredWhileExecutingJob = LoggerMessage.Define<string>(
LogLevel.Error,
6,
"An exception occured while trying to execute a job: '{JobId}'. " +
"Requeuing for another retry.");
}
public static void CollectingExpiredEntities(this ILogger logger)
{
_collectingExpiredEntities(logger, null);
}
public static void Installing(this ILogger logger)
{
_installing(logger, null);
}
public static void InstallingError(this ILogger logger, Exception ex)
{
_installingError(logger, ex);
}
public static void InstallingSuccess(this ILogger logger)
{
_installingSuccess(logger, null);
}
public static void JobFailed(this ILogger logger, Exception ex)
{
_jobFailed(logger, ex);
}
public static void JobFailedWillRetry(this ILogger logger, Exception ex)
{
_jobFailedWillRetry(logger, ex);
}
public static void JobRetrying(this ILogger logger, int retries)
{
_jobRetrying(logger, retries, null);
}
public static void JobExecuted(this ILogger logger, double seconds)
{
_jobExecuted(logger, seconds, null);
}
public static void JobCouldNotBeLoaded(this ILogger logger, int jobId, Exception ex)
{
_jobCouldNotBeLoaded(logger, jobId, ex);
}
public static void ExceptionOccuredWhileExecutingJob(this ILogger logger, string jobId, Exception ex)
{
_exceptionOccuredWhileExecutingJob(logger, jobId, ex);
}
}
}
\ No newline at end of file
using System;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
{
public class PublishQueueExecutor : BasePublishQueueExecutor
{
private readonly ILogger _logger;
private readonly RabbitMQOptions _rabbitMQOptions;
public PublishQueueExecutor(IStateChanger stateChanger,
IOptions<RabbitMQOptions> options,
ILogger<PublishQueueExecutor> logger)
: base(stateChanger, logger)
{
_logger = logger;
_rabbitMQOptions = options.Value;
}
public override Task<OperateResult> PublishAsync(string keyName, string content)
{
var factory = new ConnectionFactory()
{
HostName = _rabbitMQOptions.HostName,
UserName = _rabbitMQOptions.UserName,
Port = _rabbitMQOptions.Port,
Password = _rabbitMQOptions.Password,
VirtualHost = _rabbitMQOptions.VirtualHost,
RequestedConnectionTimeout = _rabbitMQOptions.RequestedConnectionTimeout,
SocketReadTimeout = _rabbitMQOptions.SocketReadTimeout,
SocketWriteTimeout = _rabbitMQOptions.SocketWriteTimeout
};
try
{
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
var body = Encoding.UTF8.GetBytes(content);
channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, RabbitMQOptions.ExchangeType);
channel.BasicPublish(exchange: _rabbitMQOptions.TopicExchangeName,
routingKey: keyName,
basicProperties: null,
body: body);
_logger.LogDebug($"rabbitmq topic message [{keyName}] has been published.");
}
return Task.FromResult(OperateResult.Success);
}
catch (Exception ex)
{
_logger.LogError($"rabbitmq topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}");
return Task.FromResult(OperateResult.Failed(ex,
new OperateError()
{
Code = ex.HResult.ToString(),
Description = ex.Message
}));
}
}
}
}
\ No newline at end of file
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
......@@ -45,18 +45,18 @@ namespace DotNetCore.CAP.RabbitMQ
_connection = _connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: _exchageName, type: _rabbitMQOptions.EXCHANGE_TYPE);
_channel.ExchangeDeclare(exchange: _exchageName, type: RabbitMQOptions.ExchangeType);
_channel.QueueDeclare(_queueName, exclusive: false);
}
public void Listening(TimeSpan timeout)
public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += OnConsumerReceived;
_channel.BasicConsume(_queueName, false, consumer);
while (true)
{
Task.Delay(timeout);
Task.Delay(timeout, cancellationToken).Wait();
}
}
......@@ -87,7 +87,7 @@ namespace DotNetCore.CAP.RabbitMQ
var message = new MessageContext
{
Group = _queueName,
KeyName = e.RoutingKey,
Name = e.RoutingKey,
Content = Encoding.UTF8.GetString(e.Body)
};
MessageReceieved?.Invoke(sender, message);
......
using System;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class EFOptions
{
public const string DefaultSchema = "Cap";
/// <summary>
/// Gets or sets the schema to use when creating database objects.
/// Default is <see cref="DefaultSchema"/>.
/// </summary>
public string Schema { get; set; } = DefaultSchema;
/// <summary>
/// EF dbcontext type.
/// </summary>
public Type DbContextType { get; internal set; }
}
}
\ No newline at end of file
using System;
using DotNetCore.CAP;
using Microsoft.EntityFrameworkCore;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions
{
public static CapOptions UseSqlServer(this CapOptions options, string connectionString)
{
return options.UseSqlServer(opt =>
{
opt.ConnectionString = connectionString;
});
}
public static CapOptions UseSqlServer(this CapOptions options, Action<SqlServerOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
options.RegisterExtension(new SqlServerCapOptionsExtension(configure));
return options;
}
public static CapOptions UseEntityFramework<TContext>(this CapOptions options)
where TContext : DbContext
{
return options.UseEntityFramework<TContext>(opt =>
{
opt.DbContextType = typeof(TContext);
});
}
public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
var efOptions = new EFOptions { DbContextType = typeof(TContext) };
configure(efOptions);
options.RegisterExtension(new SqlServerCapOptionsExtension(configure));
return options;
}
}
}
\ No newline at end of file
using System;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.SqlServer;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class SqlServerCapOptionsExtension : ICapOptionsExtension
{
private readonly Action<SqlServerOptions> _configure;
public SqlServerCapOptionsExtension(Action<SqlServerOptions> configure)
{
_configure = configure;
}
public void AddServices(IServiceCollection services)
{
services.AddSingleton<IStorage, SqlServerStorage>();
services.AddScoped<IStorageConnection, SqlServerStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>();
var sqlServerOptions = new SqlServerOptions();
_configure(sqlServerOptions);
var provider = TempBuildService(services);
var dbContextObj = provider.GetService(sqlServerOptions.DbContextType);
if (dbContextObj != null)
{
var dbContext = (DbContext)dbContextObj;
sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
}
services.Configure(_configure);
services.AddSingleton(sqlServerOptions);
}
private IServiceProvider TempBuildService(IServiceCollection services)
{
return services.BuildServiceProvider();
}
}
}
\ No newline at end of file
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class SqlServerOptions : EFOptions
{
/// <summary>
/// Gets or sets the database's connection string that will be used to store database entities.
/// </summary>
public string ConnectionString { get; set; }
}
}
\ No newline at end of file
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
namespace DotNetCore.CAP.SqlServer
{
public class CapPublisher : ICapPublisher
{
private readonly SqlServerOptions _options;
private readonly DbContext _dbContext;
protected bool IsUsingEF { get; }
protected IServiceProvider ServiceProvider { get; }
public CapPublisher(IServiceProvider provider, SqlServerOptions options)
{
ServiceProvider = provider;
_options = options;
if (_options.DbContextType != null)
{
IsUsingEF = true;
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
}
}
public Task PublishAsync(string name, string content)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (!IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
return Publish(name, content);
}
public Task PublishAsync<T>(string name, T contentObj)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (!IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
var content = Helper.ToJson(contentObj);
return Publish(name, content);
}
public Task PublishAsync(string name, string content, IDbConnection dbConnection)
{
if (IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
if (name == null) throw new ArgumentNullException(nameof(name));
if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection));
var dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
return PublishWithTrans(name, content, dbConnection, dbTransaction);
}
public Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
if (IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
if (name == null) throw new ArgumentNullException(nameof(name));
if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection));
if (dbTransaction == null) throw new ArgumentNullException(nameof(dbTransaction));
return PublishWithTrans(name, content, dbConnection, dbTransaction);
}
private async Task Publish(string name, string content)
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
var dbTransaction = transaction.GetDbTransaction();
await PublishWithTrans(name, content, connection, dbTransaction);
}
private async Task PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
var sql = $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
await dbConnection.ExecuteAsync(sql, message, transaction: dbTransaction);
PublishQueuer.PulseEvent.Set();
}
}
}
\ No newline at end of file
......@@ -4,8 +4,8 @@
<PropertyGroup>
<TargetFramework>netstandard1.6</TargetFramework>
<AssemblyName>DotNetCore.CAP.EntityFrameworkCore</AssemblyName>
<PackageId>DotNetCore.CAP.EntityFrameworkCore</PackageId>
<AssemblyName>DotNetCore.CAP.SqlServer</AssemblyName>
<PackageId>DotNetCore.CAP.SqlServer</PackageId>
<NetStandardImplicitPackageVersion>1.6.1</NetStandardImplicitPackageVersion>
<PackageTargetFallback>$(PackageTargetFallback);dnxcore50</PackageTargetFallback>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
......@@ -14,9 +14,9 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.2" />
<PackageReference Include="System.ComponentModel.TypeConverter" Version="4.3.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.2" />
</ItemGroup>
<ItemGroup>
......
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.SqlServer
{
public class FetchedMessage
{
public int MessageId { get; set; }
public MessageType MessageType { get; set; }
}
}
\ No newline at end of file
using System;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.SqlServer
{
public class DefaultAdditionalProcessor : IAdditionalProcessor
{
private readonly IServiceProvider _provider;
private readonly ILogger _logger;
private readonly SqlServerOptions _options;
private const int MaxBatch = 1000;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly TimeSpan _waitingInterval = TimeSpan.FromHours(2);
private static readonly string[] Tables =
{
"Published","Received"
};
public DefaultAdditionalProcessor(
IServiceProvider provider,
ILogger<DefaultAdditionalProcessor> logger,
SqlServerOptions sqlServerOptions)
{
_logger = logger;
_provider = provider;
_options = sqlServerOptions;
}
public async Task ProcessAsync(ProcessingContext context)
{
_logger.LogDebug("Collecting expired entities.");
foreach (var table in Tables)
{
var removedCount = 0;
do
{
using (var connection = new SqlConnection(_options.ConnectionString))
{
removedCount = await connection.ExecuteAsync($@"
DELETE TOP (@count)
FROM [{_options.Schema}].[{table}] WITH (readpast)
WHERE ExpiresAt < @now;", new { now = DateTime.Now, count = MaxBatch });
}
if (removedCount != 0)
{
await context.WaitAsync(_delay);
context.ThrowIfStopping();
}
} while (removedCount != 0);
}
await context.WaitAsync(_waitingInterval);
}
}
}
\ No newline at end of file
using System;
using System.Data;
using System.Threading;
using Dapper;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.SqlServer
{
public class SqlServerFetchedMessage : IFetchedMessage
{
private readonly IDbConnection _connection;
private readonly IDbTransaction _transaction;
private readonly Timer _timer;
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
private readonly object _lockObject = new object();
public SqlServerFetchedMessage(int messageId,
MessageType type,
IDbConnection connection,
IDbTransaction transaction)
{
MessageId = messageId;
MessageType = type;
_connection = connection;
_transaction = transaction;
_timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval);
}
public int MessageId { get; }
public MessageType MessageType { get; }
public void RemoveFromQueue()
{
lock (_lockObject)
{
_transaction.Commit();
}
}
public void Requeue()
{
lock (_lockObject)
{
_transaction.Rollback();
}
}
public void Dispose()
{
lock (_lockObject)
{
_timer?.Dispose();
_transaction.Dispose();
_connection.Dispose();
}
}
private void ExecuteKeepAliveQuery(object obj)
{
lock (_lockObject)
{
try
{
_connection?.Execute("SELECT 1", _transaction);
}
catch
{
// ignored
}
}
}
}
}
\ No newline at end of file
using System.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.SqlServer
{
public class SqlServerStorage : IStorage
{
private readonly SqlServerOptions _options;
private readonly ILogger _logger;
public SqlServerStorage(ILogger<SqlServerStorage> logger, SqlServerOptions options)
{
_options = options;
_logger = logger;
}
public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;
var sql = CreateDbTablesScript(_options.Schema);
using (var connection = new SqlConnection(_options.ConnectionString))
{
await connection.ExecuteAsync(sql);
}
_logger.LogDebug("Ensuring all create database tables script are applied.");
}
protected virtual string CreateDbTablesScript(string schema)
{
var batchSql =
$@"
IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{schema}')
BEGIN
EXEC('CREATE SCHEMA {schema}')
END;
IF OBJECT_ID(N'[{schema}].[Queue]',N'U') IS NULL
BEGIN
CREATE TABLE [{schema}].[Queue](
[MessageId] [int] NOT NULL,
[MessageType] [tinyint] NOT NULL
) ON [PRIMARY]
END;
IF OBJECT_ID(N'[{schema}].[Received]',N'U') IS NULL
BEGIN
CREATE TABLE [{schema}].[Received](
[Id] [int] IDENTITY(1,1) NOT NULL,
[Name] [nvarchar](200) NOT NULL,
[Group] [nvarchar](200) NULL,
[Content] [nvarchar](max) NULL,
[Retries] [int] NOT NULL,
[Added] [datetime2](7) NOT NULL,
[ExpiresAt] [datetime2](7) NULL,
[StatusName] [nvarchar](50) NOT NULL,
CONSTRAINT [PK_{schema}.Received] PRIMARY KEY CLUSTERED
(
[Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
END;
IF OBJECT_ID(N'[{schema}].[Published]',N'U') IS NULL
BEGIN
CREATE TABLE [{schema}].[Published](
[Id] [int] IDENTITY(1,1) NOT NULL,
[Name] [nvarchar](200) NOT NULL,
[Content] [nvarchar](max) NULL,
[Retries] [int] NOT NULL,
[Added] [datetime2](7) NOT NULL,
[ExpiresAt] [datetime2](7) NULL,
[StatusName] [nvarchar](50) NOT NULL,
CONSTRAINT [PK_{schema}.Published] PRIMARY KEY CLUSTERED
(
[Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
END;";
return batchSql;
}
}
}
\ No newline at end of file
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;
namespace DotNetCore.CAP.SqlServer
{
public class SqlServerStorageConnection : IStorageConnection
{
private readonly SqlServerOptions _options;
public SqlServerStorageConnection(SqlServerOptions options)
{
_options = options;
}
public SqlServerOptions Options => _options;
public IStorageTransaction CreateTransaction()
{
return new SqlServerStorageTransaction(this);
}
public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
{
var sql = $@"SELECT * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE Id={id}";
using (var connection = new SqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
}
public Task<IFetchedMessage> FetchNextMessageAsync()
{
var sql = $@"
DELETE TOP (1)
FROM [{_options.Schema}].[Queue] WITH (readpast, updlock, rowlock)
OUTPUT DELETED.MessageId,DELETED.[MessageType];";
return FetchNextMessageCoreAsync(sql);
}
public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
{
var sql = $"SELECT TOP (1) * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'";
using (var connection = new SqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
}
// CapReceviedMessage
public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $@"
INSERT INTO [{_options.Schema}].[Received]([Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
using (var connection = new SqlConnection(_options.ConnectionString))
{
await connection.ExecuteAsync(sql, message);
}
}
public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
{
var sql = $@"SELECT * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE Id={id}";
using (var connection = new SqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public async Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync()
{
var sql = $"SELECT TOP (1) * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'";
using (var connection = new SqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
public void Dispose()
{
}
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{
//here don't use `using` to dispose
var connection = new SqlConnection(_options.ConnectionString);
await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedMessage fetchedMessage = null;
try
{
fetchedMessage = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction);
}
catch (SqlException)
{
transaction.Dispose();
throw;
}
if (fetchedMessage == null)
{
transaction.Rollback();
transaction.Dispose();
connection.Dispose();
return null;
}
return new SqlServerFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection, transaction);
}
}
}
\ No newline at end of file
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.SqlServer
{
public class SqlServerStorageTransaction : IStorageTransaction, IDisposable
{
private readonly string _schema;
private readonly IDbTransaction _dbTransaction;
private readonly IDbConnection _dbConnection;
public SqlServerStorageTransaction(SqlServerStorageConnection connection)
{
var options = connection.Options;
_schema = options.Schema;
_dbConnection = new SqlConnection(options.ConnectionString);
_dbConnection.Open();
_dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
public void UpdateMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"UPDATE [{_schema}].[Published] SET [Retries] = @Retries,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
public void UpdateMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"UPDATE [{_schema}].[Received] SET [Retries] = @Retries,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}
public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Publish }, _dbTransaction);
}
public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Subscribe }, _dbTransaction);
}
public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}
public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
}
}
\ No newline at end of file
......@@ -14,7 +14,7 @@ namespace DotNetCore.CAP.Abstractions.ModelBinding
/// <returns>
/// <para>
/// A <see cref="Task"/> which will complete when the model binding process completes.
/// </para>
/// </para>
/// </returns>
Task BindModelAsync(ModelBindingContext bindingContext);
}
......
......@@ -8,9 +8,9 @@ namespace DotNetCore.CAP.Abstractions
[AttributeUsage(AttributeTargets.Method | AttributeTargets.Class, AllowMultiple = true)]
public abstract class TopicAttribute : Attribute
{
protected TopicAttribute(string topicName)
protected TopicAttribute(string name)
{
Name = topicName;
Name = name;
}
/// <summary>
......
using System;
using DotNetCore.CAP.Job;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP
......@@ -35,38 +34,6 @@ namespace DotNetCore.CAP
return this;
}
/// <summary>
/// Adds a singleton service of the type specified in serviceType with an implementation
/// </summary>
private CapBuilder AddSingleton<TService, TImplementation>()
where TService : class
where TImplementation : class, TService
{
Services.AddSingleton<TService, TImplementation>();
return this;
}
/// <summary>
/// Add an <see cref="ICapMessageStore"/> .
/// </summary>
/// <typeparam name="T">The type for the <see cref="ICapMessageStore"/> to add. </typeparam>
/// <returns>The current <see cref="CapBuilder"/> instance.</returns>
public virtual CapBuilder AddMessageStore<T>()
where T : class, ICapMessageStore
{
return AddScoped(typeof(ICapMessageStore), typeof(T));
}
/// <summary>
/// Add an <see cref="IJob"/> for process <see cref="CapJob"/>.
/// </summary>
/// <typeparam name="T">The type of the job.</typeparam>
public virtual CapBuilder AddJobs<T>()
where T : class, IJob
{
return AddSingleton<IJob, T>();
}
/// <summary>
/// Add an <see cref="ICapPublisher"/>.
/// </summary>
......
namespace DotNetCore.CAP
using System;
using System.Collections.Generic;
namespace DotNetCore.CAP
{
/// <summary>
/// Represents all the options you can use to configure the system.
/// </summary>
public class CapOptions
{
internal IList<ICapOptionsExtension> Extensions { get; private set; }
/// <summary>
/// Default value for polling delay timeout, in seconds.
/// </summary>
public const int DefaultPollingDelay = 8;
/// <summary>
/// Default value for CAP job.
/// </summary>
public const string DefaultCronExp = "* * * * *";
public CapOptions()
{
CronExp = DefaultCronExp;
PollingDelay = DefaultPollingDelay;
Extensions = new List<ICapOptionsExtension>();
}
/// <summary>
/// Corn expression for configuring retry cron job. Default is 1 min.
/// Productor job polling delay time. Default is 8 sec.
/// </summary>
public string CronExp { get; set; }
public int PollingDelay { get; set; } = 8;
/// <summary>
/// Productor job polling delay time. Default is 8 sec.
/// We’ll send a POST request to the URL below with details of any subscribed events.
/// </summary>
public int PollingDelay { get; set; } = 8;
public WebHook WebHook { get; set; }
/// <summary>
/// Registers an extension that will be executed when building services.
/// </summary>
/// <param name="extension"></param>
public void RegisterExtension(ICapOptionsExtension extension)
{
if (extension == null)
throw new ArgumentNullException(nameof(extension));
Extensions.Add(extension);
}
}
public class WebHook
{
public string PayloadUrl { get; set; }
public string Secret { get; set; }
}
}
\ No newline at end of file
......@@ -6,7 +6,8 @@ using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Abstractions.ModelBinding;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Job;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.DependencyInjection.Extensions;
namespace Microsoft.Extensions.DependencyInjection
......@@ -16,16 +17,6 @@ namespace Microsoft.Extensions.DependencyInjection
/// </summary>
public static class ServiceCollectionExtensions
{
/// <summary>
/// Adds and configures the CAP services for the consitence.
/// </summary>
/// <param name="services">The services available in the application.</param>
/// <returns>An <see cref="CapBuilder"/> for application services.</returns>
public static CapBuilder AddCap(this IServiceCollection services)
{
return services.AddCap(x => new CapOptions());
}
/// <summary>
/// Adds and configures the consistence services for the consitence.
/// </summary>
......@@ -36,10 +27,12 @@ namespace Microsoft.Extensions.DependencyInjection
this IServiceCollection services,
Action<CapOptions> setupAction)
{
if (setupAction == null) throw new ArgumentNullException(nameof(setupAction));
services.TryAddSingleton<CapMarkerService>();
services.Configure(setupAction);
AddConsumerServices(services);
AddSubscribeServices(services);
services.TryAddSingleton<IConsumerServiceSelector, DefaultConsumerServiceSelector>();
services.TryAddSingleton<IModelBinder, DefaultModelBinder>();
......@@ -47,19 +40,32 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton<MethodMatcherCache>();
services.AddSingleton<IProcessingServer, ConsumerHandler>();
services.AddSingleton<IProcessingServer, JobProcessingServer>();
services.AddSingleton<IProcessingServer, CapProcessingServer>();
services.AddSingleton<IBootstrapper, DefaultBootstrapper>();
services.AddSingleton<IStateChanger, StateChanger>();
services.TryAddTransient<IJobProcessor, CronJobProcessor>();
services.TryAddSingleton<IJob, CapJob>();
services.TryAddTransient<DefaultCronJobRegistry>();
//Processors
services.AddTransient<PublishQueuer>();
services.AddTransient<SubscribeQueuer>();
services.AddTransient<IDispatcher, DefaultDispatcher>();
services.TryAddScoped<ICapPublisher, DefaultCapPublisher>();
//Executors
services.AddSingleton<IQueueExecutorFactory, QueueExecutorFactory>();
services.AddSingleton<IQueueExecutor, SubscibeQueueExecutor>();
//Options and extension service
var options = new CapOptions();
setupAction(options);
foreach (var serviceExtension in options.Extensions)
{
serviceExtension.AddServices(services);
}
services.AddSingleton(options);
return new CapBuilder(services);
}
private static void AddConsumerServices(IServiceCollection services)
private static void AddSubscribeServices(IServiceCollection services)
{
var consumerListenerServices = new Dictionary<Type, Type>();
foreach (var rejectedServices in services)
......
......@@ -12,7 +12,7 @@
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<AllowUnsafeBlocks>False</AllowUnsafeBlocks>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="1.1.2" />
......@@ -20,8 +20,8 @@
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.1" />
<PackageReference Include="ncrontab" Version="3.3.0" />
<PackageReference Include="Newtonsoft.Json" Version="10.0.3" />
<PackageReference Include="System.Data.Common" Version="4.3.0" />
<PackageReference Include="System.Threading.ThreadPool" Version="4.3.0" />
</ItemGroup>
......
......@@ -2,7 +2,6 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
......@@ -24,7 +23,7 @@ namespace DotNetCore.CAP
public DefaultBootstrapper(
ILogger<DefaultBootstrapper> logger,
IOptions<CapOptions> options,
ICapMessageStore storage,
IStorage storage,
IApplicationLifetime appLifetime,
IServiceProvider provider)
{
......@@ -52,7 +51,7 @@ namespace DotNetCore.CAP
protected CapOptions Options { get; }
protected ICapMessageStore Storage { get; }
protected IStorage Storage { get; }
protected IEnumerable<IProcessingServer> Servers { get; }
......@@ -65,7 +64,7 @@ namespace DotNetCore.CAP
private async Task BootstrapTaskAsync()
{
if (_cts.IsCancellationRequested) return;
await Storage.InitializeAsync(_cts.Token);
if (_cts.IsCancellationRequested) return;
......@@ -98,7 +97,7 @@ namespace DotNetCore.CAP
item.Dispose();
}
});
return Task.FromResult(0);
return Task.CompletedTask;
}
}
}
\ No newline at end of file
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP
{
/// <summary>
/// Provides an abstraction for a store which manages CAP message.
/// </summary>
public interface ICapMessageStore
{
/// <summary>
/// Creates a new message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to create in the store.</param>
Task<OperateResult> StoreSentMessageAsync(CapSentMessage message);
/// <summary>
/// Change <see cref="CapSentMessage"/> model status name.
/// </summary>
/// <param name="message">The type of <see cref="CapSentMessage"/>.</param>
/// <param name="statusName">The status name.</param>
/// <param name="autoSaveChanges">auto save dbcontext changes.</param>
/// <returns></returns>
Task<OperateResult> ChangeSentMessageStateAsync(CapSentMessage message, string statusName,
bool autoSaveChanges = true);
/// <summary>
/// Fetches the next message to be executed.
/// </summary>
/// <returns></returns>
Task<CapSentMessage> GetNextSentMessageToBeEnqueuedAsync();
/// <summary>
/// Updates a message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to update in the store.</param>
Task<OperateResult> UpdateSentMessageAsync(CapSentMessage message);
/// <summary>
/// Deletes a message from the store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to delete in the store.</param>
Task<OperateResult> RemoveSentMessageAsync(CapSentMessage message);
/// <summary>
/// Creates a new message in a store as an asynchronous operation.
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
Task<OperateResult> StoreReceivedMessageAsync(CapReceivedMessage message);
/// <summary>
/// Change <see cref="CapReceivedMessage"/> model status name.
/// </summary>
/// <param name="message">The type of <see cref="CapReceivedMessage"/>.</param>
/// <param name="statusName">The status name.</param>
/// <param name="autoSaveChanges">auto save dbcontext changes.</param>
/// <returns></returns>
Task<OperateResult> ChangeReceivedMessageStateAsync(CapReceivedMessage message, string statusName,
bool autoSaveChanges = true);
/// <summary>
/// Fetches the next message to be executed.
/// </summary>
Task<CapReceivedMessage> GetNextReceivedMessageToBeExcuted();
/// <summary>
/// Updates a message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to update in the store.</param>
Task<OperateResult> UpdateReceivedMessageAsync(CapReceivedMessage message);
}
}
\ No newline at end of file
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP
{
public interface ICapOptionsExtension
{
void AddServices(IServiceCollection services);
}
}
\ No newline at end of file
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP
{
/// <summary>
/// Cap <see cref="ICapPublisher"/> default implement.
/// </summary>
public class DefaultCapPublisher : ICapPublisher
{
private readonly ICapMessageStore _store;
private readonly ILogger _logger;
public DefaultCapPublisher(
ICapMessageStore store,
ILogger<DefaultCapPublisher> logger)
{
_store = store;
_logger = logger;
}
public Task PublishAsync(string topic, string content)
{
if (topic == null) throw new ArgumentNullException(nameof(topic));
if (content == null) throw new ArgumentNullException(nameof(content));
return StoreMessage(topic, content);
}
public Task PublishAsync<T>(string topic, T contentObj)
{
if (topic == null) throw new ArgumentNullException(nameof(topic));
var content = Helper.ToJson(contentObj);
if (content == null)
throw new InvalidCastException(nameof(contentObj));
return StoreMessage(topic, content);
}
private async Task StoreMessage(string topic, string content)
{
var message = new CapSentMessage
{
KeyName = topic,
Content = content,
StatusName = StatusName.Enqueued
};
await _store.StoreSentMessageAsync(message);
WaitHandleEx.PulseEvent.Set();
_logger.EnqueuingSentMessage(topic, content);
}
}
}
\ No newline at end of file
using System.Threading.Tasks;
using System.Data;
using System.Threading.Tasks;
namespace DotNetCore.CAP
{
......@@ -9,18 +10,42 @@ namespace DotNetCore.CAP
{
/// <summary>
/// Publish a string message to specified topic.
/// <para>
/// If you are using the EntityFramework, you need to configure the DbContextType first.
/// otherwise you need to use overloaded method with IDbConnection and IDbTransaction.
/// </para>
/// </summary>
/// <param name="topic">the topic name or exchange router key.</param>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="content">message body content.</param>
Task PublishAsync(string topic, string content);
Task PublishAsync(string name, string content);
/// <summary>
/// Publis a object message to specified topic.
/// <para>
/// If you are using the EntityFramework, you need to configure the DbContextType first.
/// otherwise you need to use overloaded method with IDbConnection and IDbTransaction.
/// </para>
/// </summary>
/// <typeparam name="T">The type of conetent object.</typeparam>
/// <param name="topic">the topic name or exchange router key.</param>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">object instance that will be serialized of json.</param>
/// <returns></returns>
Task PublishAsync<T>(string topic, T contentObj);
Task PublishAsync<T>(string name, T contentObj);
/// <summary>
/// Publish a string message to specified topic with transacton.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="content">message body content.</param>
/// <param name="dbConnection">the dbConnection of <see cref="IDbConnection"/></param>
Task PublishAsync(string name, string content, IDbConnection dbConnection);
/// <summary>
/// Publish a string message to specified topic with transacton.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="content">message body content.</param>
/// <param name="dbConnection">the connection of <see cref="IDbConnection"/></param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction"/></param>
Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction);
}
}
\ No newline at end of file
using System;
using System.Threading;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP
......@@ -12,7 +13,7 @@ namespace DotNetCore.CAP
void Subscribe(string topic, int partition);
void Listening(TimeSpan timeout);
void Listening(TimeSpan timeout, CancellationToken cancellationToken);
void Commit();
......
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
......@@ -16,12 +16,11 @@ namespace DotNetCore.CAP
private readonly IServiceProvider _serviceProvider;
private readonly IConsumerInvokerFactory _consumerInvokerFactory;
private readonly IConsumerClientFactory _consumerClientFactory;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
private readonly CancellationTokenSource _cts;
private readonly MethodMatcherCache _selector;
private readonly CapOptions _options;
private readonly CancellationTokenSource _cts;
private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1);
......@@ -32,13 +31,12 @@ namespace DotNetCore.CAP
IServiceProvider serviceProvider,
IConsumerInvokerFactory consumerInvokerFactory,
IConsumerClientFactory consumerClientFactory,
ILoggerFactory loggerFactory,
ILogger<ConsumerHandler> logger,
MethodMatcherCache selector,
IOptions<CapOptions> options)
{
_selector = selector;
_logger = loggerFactory.CreateLogger<ConsumerHandler>();
_loggerFactory = loggerFactory;
_logger = logger;
_serviceProvider = serviceProvider;
_consumerInvokerFactory = consumerInvokerFactory;
_consumerClientFactory = consumerClientFactory;
......@@ -63,9 +61,9 @@ namespace DotNetCore.CAP
client.Subscribe(item.Attribute.Name);
}
client.Listening(_pollingDelay);
client.Listening(_pollingDelay, _cts.Token);
}
}, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Current);
}, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
_compositeTask = Task.CompletedTask;
}
......@@ -83,7 +81,7 @@ namespace DotNetCore.CAP
try
{
_compositeTask.Wait((int)TimeSpan.FromSeconds(60).TotalMilliseconds);
_compositeTask.Wait(TimeSpan.FromSeconds(60));
}
catch (AggregateException ex)
{
......@@ -99,56 +97,32 @@ namespace DotNetCore.CAP
{
client.MessageReceieved += (sender, message) =>
{
_logger.EnqueuingReceivedMessage(message.KeyName, message.Content);
_logger.EnqueuingReceivedMessage(message.Name, message.Content);
using (var scope = _serviceProvider.CreateScope())
{
var receviedMessage = StoreMessage(scope, message);
client.Commit();
ProcessMessage(scope, receviedMessage);
}
Pulse();
};
}
private CapReceivedMessage StoreMessage(IServiceScope serviceScope, MessageContext messageContext)
{
var provider = serviceScope.ServiceProvider;
var messageStore = provider.GetRequiredService<ICapMessageStore>();
var messageStore = provider.GetRequiredService<IStorageConnection>();
var receivedMessage = new CapReceivedMessage(messageContext)
{
StatusName = StatusName.Enqueued,
StatusName = StatusName.Scheduled,
};
messageStore.StoreReceivedMessageAsync(receivedMessage).Wait();
return receivedMessage;
}
private void ProcessMessage(IServiceScope serviceScope, CapReceivedMessage receivedMessage)
public void Pulse()
{
var provider = serviceScope.ServiceProvider;
var messageStore = provider.GetRequiredService<ICapMessageStore>();
try
{
var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.KeyName);
if (executeDescriptorGroup.ContainsKey(receivedMessage.Group))
{
messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Processing).Wait();
// If there are multiple consumers in the same group, we will take the first
var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0];
var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext());
_consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync();
messageStore.ChangeReceivedMessageStateAsync(receivedMessage, StatusName.Succeeded).Wait();
}
}
catch (Exception ex)
{
_logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.KeyName}", ex);
}
SubscribeQueuer.PulseEvent.Set();
}
}
}
\ No newline at end of file
using System;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP
{
public interface IFetchedMessage : IDisposable
{
int MessageId { get; }
MessageType MessageType { get; }
void RemoveFromQueue();
void Requeue();
}
}
\ No newline at end of file
......@@ -7,6 +7,8 @@ namespace DotNetCore.CAP
/// </summary>
public interface IProcessingServer : IDisposable
{
void Pulse();
void Start();
}
}
\ No newline at end of file
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP
{
public abstract class BasePublishQueueExecutor : IQueueExecutor
{
private readonly IStateChanger _stateChanger;
private readonly ILogger _logger;
protected BasePublishQueueExecutor(IStateChanger stateChanger,
ILogger<BasePublishQueueExecutor> logger)
{
_stateChanger = stateChanger;
_logger = logger;
}
public abstract Task<OperateResult> PublishAsync(string keyName, string content);
public async Task<OperateResult> ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched)
{
var message = await connection.GetPublishedMessageAsync(fetched.MessageId);
try
{
var sp = Stopwatch.StartNew();
await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection);
if (message.Retries > 0)
{
_logger.JobRetrying(message.Retries);
}
var result = await PublishAsync(message.Name, message.Content);
sp.Stop();
var newState = default(IState);
if (!result.Succeeded)
{
var shouldRetry = await UpdateMessageForRetryAsync(message, connection);
if (shouldRetry)
{
newState = new ScheduledState();
_logger.JobFailedWillRetry(result.Exception);
}
else
{
newState = new FailedState();
_logger.JobFailed(result.Exception);
}
}
else
{
newState = new SucceededState();
}
await _stateChanger.ChangeStateAsync(message, newState, connection);
fetched.RemoveFromQueue();
if (result.Succeeded)
{
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
}
return OperateResult.Success;
}
catch (Exception ex)
{
_logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex);
return OperateResult.Failed(ex);
}
}
private async Task<bool> UpdateMessageForRetryAsync(CapPublishedMessage message, IStorageConnection connection)
{
var retryBehavior = RetryBehavior.DefaultRetry;
var now = DateTime.Now;
var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
{
return false;
}
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due;
using (var transaction = connection.CreateTransaction())
{
transaction.UpdateMessage(message);
await transaction.CommitAsync();
}
return true;
}
}
}
\ No newline at end of file
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP
{
public class SubscibeQueueExecutor : IQueueExecutor
{
private readonly IConsumerInvokerFactory _consumerInvokerFactory;
private readonly IStateChanger _stateChanger;
private readonly ILogger _logger;
private readonly MethodMatcherCache _selector;
public SubscibeQueueExecutor(
IStateChanger stateChanger,
MethodMatcherCache selector,
IConsumerInvokerFactory consumerInvokerFactory,
ILogger<BasePublishQueueExecutor> logger)
{
_selector = selector;
_consumerInvokerFactory = consumerInvokerFactory;
_stateChanger = stateChanger;
_logger = logger;
}
public async Task<OperateResult> ExecuteAsync(IStorageConnection connection, IFetchedMessage fetched)
{
var message = await connection.GetReceivedMessageAsync(fetched.MessageId);
try
{
var sp = Stopwatch.StartNew();
await _stateChanger.ChangeStateAsync(message, new ProcessingState(), connection);
if (message.Retries > 0)
{
_logger.JobRetrying(message.Retries);
}
var result = await ExecuteSubscribeAsync(message);
sp.Stop();
var newState = default(IState);
if (!result.Succeeded)
{
var shouldRetry = await UpdateMessageForRetryAsync(message, connection);
if (shouldRetry)
{
newState = new ScheduledState();
_logger.JobFailedWillRetry(result.Exception);
}
else
{
newState = new FailedState();
_logger.JobFailed(result.Exception);
}
}
else
{
newState = new SucceededState();
}
await _stateChanger.ChangeStateAsync(message, newState, connection);
fetched.RemoveFromQueue();
if (result.Succeeded)
{
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
}
return OperateResult.Success;
}
catch (SubscriberNotFoundException ex)
{
_logger.LogError(ex.Message);
return OperateResult.Failed(ex);
}
catch (Exception ex)
{
_logger.ExceptionOccuredWhileExecutingJob(message?.Name, ex);
return OperateResult.Failed(ex);
}
}
protected virtual async Task<OperateResult> ExecuteSubscribeAsync(CapReceivedMessage receivedMessage)
{
try
{
var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.Name);
if (!executeDescriptorGroup.ContainsKey(receivedMessage.Group))
{
throw new SubscriberNotFoundException(receivedMessage.Name + " has not been found.");
}
// If there are multiple consumers in the same group, we will take the first
var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0];
var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext());
await _consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync();
return OperateResult.Success;
}
catch (SubscriberNotFoundException ex)
{
_logger.LogError("Can not be found subscribe method of name: " + receivedMessage.Name);
return OperateResult.Failed(ex);
}
catch (Exception ex)
{
_logger.ConsumerMethodExecutingFailed($"Group:{receivedMessage.Group}, Topic:{receivedMessage.Name}", ex);
return OperateResult.Failed(ex);
}
}
private async Task<bool> UpdateMessageForRetryAsync(CapReceivedMessage message, IStorageConnection connection)
{
var retryBehavior = RetryBehavior.DefaultRetry;
var now = DateTime.Now;
var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
{
return false;
}
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due;
using (var transaction = connection.CreateTransaction())
{
transaction.UpdateMessage(message);
await transaction.CommitAsync();
}
return true;
}
}
}
\ No newline at end of file
using System.Threading.Tasks;
namespace DotNetCore.CAP
{
public interface IQueueExecutor
{
Task<OperateResult> ExecuteAsync(IStorageConnection connection, IFetchedMessage message);
}
}
\ No newline at end of file
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP
{
public interface IQueueExecutorFactory
{
IQueueExecutor GetInstance(MessageType messageType);
}
}
\ No newline at end of file
using System.Threading;
using System.Threading.Tasks;
namespace DotNetCore.CAP
{
/// <summary>
/// Represents a persisted storage.
/// </summary>
public interface IStorage
{
/// <summary>
/// Initializes the storage. For example, making sure a database is created and migrations are applied.
/// </summary>
Task InitializeAsync(CancellationToken cancellationToken);
}
}
\ No newline at end of file
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP
{
/// <summary>
/// Represents a connection to the storage.
/// </summary>
public interface IStorageConnection : IDisposable
{
//Sent messages
/// <summary>
/// Returns the message with the given id.
/// </summary>
/// <param name="id">The message's id.</param>
Task<CapPublishedMessage> GetPublishedMessageAsync(int id);
/// <summary>
/// Fetches the next message to be executed.
/// </summary>
Task<IFetchedMessage> FetchNextMessageAsync();
/// <summary>
/// Returns the next message to be enqueued.
/// </summary>
Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync();
// Received messages
/// <summary>
/// Stores the message.
/// </summary>
/// <param name="message">The message to store.</param>
Task StoreReceivedMessageAsync(CapReceivedMessage message);
/// <summary>
/// Returns the message with the given id.
/// </summary>
/// <param name="id">The message's id.</param>
Task<CapReceivedMessage> GetReceivedMessageAsync(int id);
/// <summary>
/// Returns the next message to be enqueued.
/// </summary>
Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync();
//-----------------------------------------
/// <summary>
/// Creates and returns an <see cref="IStorageTransaction"/>.
/// </summary>
IStorageTransaction CreateTransaction();
}
}
\ No newline at end of file
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP
{
public interface IStorageTransaction : IDisposable
{
void UpdateMessage(CapPublishedMessage message);
void UpdateMessage(CapReceivedMessage message);
void EnqueueMessage(CapPublishedMessage message);
void EnqueueMessage(CapReceivedMessage message);
Task CommitAsync();
}
}
\ No newline at end of file
......@@ -4,9 +4,9 @@ using Newtonsoft.Json;
namespace DotNetCore.CAP.Infrastructure
{
internal static class Helper
public static class Helper
{
private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Local);
private static JsonSerializerSettings _serializerSettings;
public static void SetSerializerSettings(JsonSerializerSettings setting)
......@@ -40,7 +40,7 @@ namespace DotNetCore.CAP.Infrastructure
public static long ToTimestamp(DateTime value)
{
var elapsedTime = value - Epoch;
return (long) elapsedTime.TotalSeconds;
return (long)elapsedTime.TotalSeconds;
}
public static DateTime FromTimestamp(long value)
......
using System;
using System.Collections.Generic;
using System.Text;
namespace DotNetCore.CAP.Infrastructure
namespace DotNetCore.CAP.Infrastructure
{
/// <summary>
/// The message status name.
/// </summary>
public struct StatusName
{
public const string Scheduled = nameof(Scheduled);
public const string Enqueued = nameof(Enqueued);
public const string Processing = nameof(Processing);
public const string Succeeded = nameof(Succeeded);
public const string Failed = nameof(Failed);
}
}
}
\ No newline at end of file
......@@ -6,8 +6,6 @@ namespace DotNetCore.CAP.Infrastructure
{
public static class WaitHandleEx
{
public static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);
public static Task WaitAnyAsync(WaitHandle handle1, WaitHandle handle2, TimeSpan timeout)
{
var t1 = handle1.WaitOneAsync(timeout);
......@@ -23,7 +21,7 @@ namespace DotNetCore.CAP.Infrastructure
var tcs = new TaskCompletionSource<bool>();
registeredHandle = ThreadPool.RegisterWaitForSingleObject(
handle,
(state, timedOut) => ((TaskCompletionSource<bool>) state).TrySetResult(!timedOut),
(state, timedOut) => ((TaskCompletionSource<bool>)state).TrySetResult(!timedOut),
tcs,
timeout,
true);
......
using System;
using System.Collections.Generic;
using System.Text;
using Xunit;
namespace DotNetCore.CAP.EntityFrameworkCore.Test
namespace DotNetCore.CAP.Infrastructure
{
public class CapMessageStoreTest
public class WebHookProvider
{
public WebHookProvider()
{
throw new NotImplementedException();
}
}
}
\ No newline at end of file
}
using DotNetCore.CAP.Abstractions;
namespace DotNetCore.CAP.Infrastructure
namespace DotNetCore.CAP.Internal
{
public interface IConsumerInvokerFactory
{
......
......@@ -44,7 +44,6 @@ namespace DotNetCore.CAP.Internal
return executorDescriptorList;
}
private static IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes(
IServiceProvider provider)
{
......
using System;
using System.Linq;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using DotNetCore.CAP.Abstractions;
namespace DotNetCore.CAP.Internal
......@@ -41,7 +41,7 @@ namespace DotNetCore.CAP.Internal
/// <summary>
/// Get a dictionary of specify topic candidates.
/// The Key is Group name, the value is specify topic candidates.
/// The Key is Group name, the value is specify topic candidates.
/// </summary>
/// <param name="topicName">message topic name</param>
public IDictionary<string, IList<ConsumerExecutorDescriptor>> GetTopicExector(string topicName)
......
......@@ -130,7 +130,7 @@ namespace DotNetCore.CAP.Internal
private static ConsumerMethodExecutor WrapVoidAction(VoidActionExecutor executor)
{
return delegate(object target, object[] parameters)
return delegate (object target, object[] parameters)
{
executor(target, parameters);
return null;
......@@ -192,7 +192,7 @@ namespace DotNetCore.CAP.Internal
/// </summary>
private static async Task<object> CastToObject<T>(Task<T> task)
{
return (object) await task;
return (object)await task;
}
private static Type GetTaskInnerTypeOrNull(Type type)
......@@ -279,7 +279,7 @@ namespace DotNetCore.CAP.Internal
private static Task<object> Convert<T>(object taskAsObject)
{
var task = (Task<T>) taskAsObject;
var task = (Task<T>)taskAsObject;
return CastToObject<T>(task);
}
......
using System;
namespace DotNetCore.CAP.Internal
{
public class SubscriberNotFoundException : Exception
{
public SubscriberNotFoundException()
{
}
public SubscriberNotFoundException(string message) : base(message)
{
}
public SubscriberNotFoundException(string message, Exception inner) :
base(message, inner)
{ }
}
}
\ No newline at end of file
using System;
using NCrontab;
namespace DotNetCore.CAP.Job
{
public class ComputedCronJob
{
private readonly CronJobRegistry.Entry _entry;
public ComputedCronJob()
{
}
public ComputedCronJob(CronJob job)
{
Job = job;
Schedule = CrontabSchedule.Parse(job.Cron);
if (job.TypeName != null)
{
JobType = Type.GetType(job.TypeName);
}
}
public ComputedCronJob(CronJob job, CronJobRegistry.Entry entry)
: this(job)
{
_entry = entry;
}
public CronJob Job { get; set; }
public CrontabSchedule Schedule { get; set; }
public Type JobType { get; set; }
public DateTime Next { get; set; }
public int Retries { get; set; }
public DateTime FirstTry { get; set; }
public RetryBehavior RetryBehavior => _entry.RetryBehavior;
public void Update(DateTime baseTime)
{
Job.LastRun = baseTime;
}
public void UpdateNext(DateTime now)
{
var next = Schedule.GetNextOccurrence(now);
var previousNext = Schedule.GetNextOccurrence(Job.LastRun);
Next = next > previousNext ? now : next;
}
}
}
\ No newline at end of file
using System;
namespace DotNetCore.CAP.Job
{
public class Cron
{
/// <summary>
/// Returns cron expression that fires every minute.
/// </summary>
public static string Minutely()
{
return "* * * * *";
}
/// <summary>
/// Returns cron expression that fires every hour at the first minute.
/// </summary>
public static string Hourly()
{
return Hourly(minute: 0);
}
/// <summary>
/// Returns cron expression that fires every hour at the specified minute.
/// </summary>
/// <param name="minute">The minute in which the schedule will be activated (0-59).</param>
public static string Hourly(int minute)
{
return string.Format("{0} * * * *", minute);
}
/// <summary>
/// Returns cron expression that fires every day at 00:00 UTC.
/// </summary>
public static string Daily()
{
return Daily(hour: 0);
}
/// <summary>
/// Returns cron expression that fires every day at the first minute of
/// the specified hour in UTC.
/// </summary>
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param>
public static string Daily(int hour)
{
return Daily(hour, minute: 0);
}
/// <summary>
/// Returns cron expression that fires every day at the specified hour and minute
/// in UTC.
/// </summary>
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param>
/// <param name="minute">The minute in which the schedule will be activated (0-59).</param>
public static string Daily(int hour, int minute)
{
return string.Format("{0} {1} * * *", minute, hour);
}
/// <summary>
/// Returns cron expression that fires every week at Monday, 00:00 UTC.
/// </summary>
public static string Weekly()
{
return Weekly(DayOfWeek.Monday);
}
/// <summary>
/// Returns cron expression that fires every week at 00:00 UTC of the specified
/// day of the week.
/// </summary>
/// <param name="dayOfWeek">The day of week in which the schedule will be activated.</param>
public static string Weekly(DayOfWeek dayOfWeek)
{
return Weekly(dayOfWeek, hour: 0);
}
/// <summary>
/// Returns cron expression that fires every week at the first minute
/// of the specified day of week and hour in UTC.
/// </summary>
/// <param name="dayOfWeek">The day of week in which the schedule will be activated.</param>
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param>
public static string Weekly(DayOfWeek dayOfWeek, int hour)
{
return Weekly(dayOfWeek, hour, minute: 0);
}
/// <summary>
/// Returns cron expression that fires every week at the specified day
/// of week, hour and minute in UTC.
/// </summary>
/// <param name="dayOfWeek">The day of week in which the schedule will be activated.</param>
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param>
/// <param name="minute">The minute in which the schedule will be activated (0-59).</param>
public static string Weekly(DayOfWeek dayOfWeek, int hour, int minute)
{
return string.Format("{0} {1} * * {2}", minute, hour, (int) dayOfWeek);
}
/// <summary>
/// Returns cron expression that fires every month at 00:00 UTC of the first
/// day of month.
/// </summary>
public static string Monthly()
{
return Monthly(day: 1);
}
/// <summary>
/// Returns cron expression that fires every month at 00:00 UTC of the specified
/// day of month.
/// </summary>
/// <param name="day">The day of month in which the schedule will be activated (1-31).</param>
public static string Monthly(int day)
{
return Monthly(day, hour: 0);
}
/// <summary>
/// Returns cron expression that fires every month at the first minute of the
/// specified day of month and hour in UTC.
/// </summary>
/// <param name="day">The day of month in which the schedule will be activated (1-31).</param>
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param>
public static string Monthly(int day, int hour)
{
return Monthly(day, hour, minute: 0);
}
/// <summary>
/// Returns cron expression that fires every month at the specified day of month,
/// hour and minute in UTC.
/// </summary>
/// <param name="day">The day of month in which the schedule will be activated (1-31).</param>
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param>
/// <param name="minute">The minute in which the schedule will be activated (0-59).</param>
public static string Monthly(int day, int hour, int minute)
{
return string.Format("{0} {1} {2} * *", minute, hour, day);
}
/// <summary>
/// Returns cron expression that fires every year on Jan, 1st at 00:00 UTC.
/// </summary>
public static string Yearly()
{
return Yearly(month: 1);
}
/// <summary>
/// Returns cron expression that fires every year in the first day at 00:00 UTC
/// of the specified month.
/// </summary>
/// <param name="month">The month in which the schedule will be activated (1-12).</param>
public static string Yearly(int month)
{
return Yearly(month, day: 1);
}
/// <summary>
/// Returns cron expression that fires every year at 00:00 UTC of the specified
/// month and day of month.
/// </summary>
/// <param name="month">The month in which the schedule will be activated (1-12).</param>
/// <param name="day">The day of month in which the schedule will be activated (1-31).</param>
public static string Yearly(int month, int day)
{
return Yearly(month, day, hour: 0);
}
/// <summary>
/// Returns cron expression that fires every year at the first minute of the
/// specified month, day and hour in UTC.
/// </summary>
/// <param name="month">The month in which the schedule will be activated (1-12).</param>
/// <param name="day">The day of month in which the schedule will be activated (1-31).</param>
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param>
public static string Yearly(int month, int day, int hour)
{
return Yearly(month, day, hour, minute: 0);
}
/// <summary>
/// Returns cron expression that fires every year at the specified month, day,
/// hour and minute in UTC.
/// </summary>
/// <param name="month">The month in which the schedule will be activated (1-12).</param>
/// <param name="day">The day of month in which the schedule will be activated (1-31).</param>
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param>
/// <param name="minute">The minute in which the schedule will be activated (0-59).</param>
public static string Yearly(int month, int day, int hour, int minute)
{
return $"{minute} {hour} {day} {month} *";
}
}
}
\ No newline at end of file
using System;
namespace DotNetCore.CAP.Job
{
/// <summary>
/// Represents a cron job to be executed at specified intervals of time.
/// </summary>
public class CronJob
{
public CronJob()
{
Id = Guid.NewGuid().ToString();
}
public CronJob(string cron)
: this()
{
Cron = cron;
}
public CronJob(string cron, DateTime lastRun)
: this(cron)
{
LastRun = lastRun;
}
public string Id { get; set; }
public string Name { get; set; }
public string TypeName { get; set; }
public string Cron { get; set; }
public DateTime LastRun { get; set; }
}
}
\ No newline at end of file
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Job
{
public class DefaultCronJobRegistry : CronJobRegistry
{
public DefaultCronJobRegistry(IOptions<CapOptions> options)
{
var options1 = options.Value;
RegisterJob<CapJob>(nameof(DefaultCronJobRegistry), options1.CronExp, RetryBehavior.DefaultRetry);
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Reflection;
using NCrontab;
namespace DotNetCore.CAP.Job
{
public abstract class CronJobRegistry
{
private readonly List<Entry> _entries;
protected CronJobRegistry()
{
_entries = new List<Entry>();
}
protected void RegisterJob<T>(string name, string cron, RetryBehavior retryBehavior = null)
where T : IJob
{
RegisterJob(name, typeof(T), cron, retryBehavior);
}
/// <summary>
/// Registers a cron job.
/// </summary>
/// <param name="name">The name of the job.</param>
/// <param name="jobType">The job's type.</param>
/// <param name="cron">The cron expression to use.</param>
/// <param name="retryBehavior">The <see cref="RetryBehavior"/> to use.</param>
protected void RegisterJob(string name, Type jobType, string cron, RetryBehavior retryBehavior = null)
{
if (string.IsNullOrWhiteSpace(name)) throw new ArgumentException(nameof(cron));
if (jobType == null) throw new ArgumentNullException(nameof(jobType));
if (cron == null) throw new ArgumentNullException(nameof(cron));
retryBehavior = retryBehavior ?? RetryBehavior.DefaultRetry;
CrontabSchedule.TryParse(cron);
if (!typeof(IJob).GetTypeInfo().IsAssignableFrom(jobType))
{
throw new ArgumentException(
"Cron jobs should extend IJob.", nameof(jobType));
}
_entries.Add(new Entry(name, jobType, cron));
}
public Entry[] Build() => _entries.ToArray();
public class Entry
{
public Entry(string name, Type jobType, string cron)
{
Name = name;
JobType = jobType;
Cron = cron;
}
public string Name { get; set; }
public Type JobType { get; set; }
public string Cron { get; set; }
public RetryBehavior RetryBehavior { get; set; }
}
}
}
\ No newline at end of file
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP.Job
{
public class CapJob : IJob
{
private readonly MethodMatcherCache _selector;
private readonly IConsumerInvokerFactory _consumerInvokerFactory;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<CapJob> _logger;
private readonly ICapMessageStore _messageStore;
public CapJob(
ILogger<CapJob> logger,
IServiceProvider serviceProvider,
IConsumerInvokerFactory consumerInvokerFactory,
ICapMessageStore messageStore,
MethodMatcherCache selector)
{
_logger = logger;
_serviceProvider = serviceProvider;
_consumerInvokerFactory = consumerInvokerFactory;
_messageStore = messageStore;
_selector = selector;
}
public async Task ExecuteAsync()
{
var groupedCandidates = _selector.GetCandidatesMethodsOfGroupNameGrouped(_serviceProvider);
using (var scope = _serviceProvider.CreateScope())
{
var provider = scope.ServiceProvider;
var messageStore = provider.GetService<ICapMessageStore>();
var nextReceivedMessage = await messageStore.GetNextReceivedMessageToBeExcuted();
if (nextReceivedMessage != null && groupedCandidates.ContainsKey(nextReceivedMessage.Group))
{
try
{
await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Processing);
// If there are multiple consumers in the same group, we will take the first
var executeDescriptor = groupedCandidates[nextReceivedMessage.Group][0];
var consumerContext = new ConsumerContext(executeDescriptor, nextReceivedMessage.ToMessageContext());
var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext);
await invoker.InvokeAsync();
await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Succeeded);
}
catch (Exception ex)
{
_logger.ReceivedMessageRetryExecutingFailed(nextReceivedMessage.KeyName, ex);
}
}
}
}
}
}
\ No newline at end of file
using System.Threading.Tasks;
namespace DotNetCore.CAP.Job
{
public interface IJob
{
/// <summary>
/// Executes the job.
/// </summary>
Task ExecuteAsync();
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Job
{
public class CronJobProcessor : IJobProcessor
{
private readonly ILogger _logger;
private IServiceProvider _provider;
private readonly DefaultCronJobRegistry _jobRegistry;
public CronJobProcessor(
DefaultCronJobRegistry jobRegistry,
ILogger<CronJobProcessor> logger,
IServiceProvider provider)
{
_jobRegistry = jobRegistry;
_logger = logger;
_provider = provider;
}
public override string ToString() => nameof(CronJobProcessor);
public Task ProcessAsync(ProcessingContext context)
{
if (context == null) throw new ArgumentNullException(nameof(context));
return ProcessCoreAsync(context);
}
private async Task ProcessCoreAsync(ProcessingContext context)
{
//var storage = context.Storage;
//var jobs = await GetJobsAsync(storage);
var jobs = GetJobs();
if (!jobs.Any())
{
_logger.CronJobsNotFound();
// This will cancel this processor.
throw new OperationCanceledException();
}
_logger.CronJobsScheduling(jobs);
context.ThrowIfStopping();
var computedJobs = Compute(jobs, context.CronJobRegistry.Build());
if (context.IsStopping)
{
return;
}
await Task.WhenAll(computedJobs.Select(j => RunAsync(j, context)));
}
private async Task RunAsync(ComputedCronJob computedJob, ProcessingContext context)
{
//var storage = context.Storage;
var retryBehavior = computedJob.RetryBehavior;
while (!context.IsStopping)
{
var now = DateTime.UtcNow;
var due = ComputeDue(computedJob, now);
var timeSpan = due - now;
if (timeSpan.TotalSeconds > 0)
{
await context.WaitAsync(timeSpan);
}
context.ThrowIfStopping();
using (var scopedContext = context.CreateScope())
{
var provider = scopedContext.Provider;
var job = provider.GetService<IJob>();
var success = true;
try
{
var sw = Stopwatch.StartNew();
await job.ExecuteAsync();
sw.Stop();
computedJob.Retries = 0;
_logger.CronJobExecuted(computedJob.Job.Name, sw.Elapsed.TotalSeconds);
}
catch (Exception ex)
{
success = false;
if (computedJob.Retries == 0)
{
computedJob.FirstTry = DateTime.UtcNow;
}
computedJob.Retries++;
_logger.CronJobFailed(computedJob.Job.Name, ex);
}
if (success)
{
computedJob.Update(DateTime.UtcNow);
}
}
}
}
private DateTime ComputeDue(ComputedCronJob computedJob, DateTime now)
{
computedJob.UpdateNext(now);
var retryBehavior = computedJob.RetryBehavior ?? RetryBehavior.DefaultRetry;
var retries = computedJob.Retries;
if (retries == 0)
{
return computedJob.Next;
}
var realNext = computedJob.Schedule.GetNextOccurrence(now);
if (!retryBehavior.Retry)
{
// No retry. If job failed before, we don't care, just schedule it next as usual.
return realNext;
}
if (retries >= retryBehavior.RetryCount)
{
// Max retries. Just schedule it for the next occurance.
return realNext;
}
// Delay a bit.
return computedJob.FirstTry.AddSeconds(retryBehavior.RetryIn(retries));
}
private CronJob[] GetJobs()
{
var cronJobs = new List<CronJob>();
var entries = _jobRegistry.Build() ?? new CronJobRegistry.Entry[0];
foreach (var entry in entries)
{
cronJobs.Add(new CronJob
{
Name = entry.Name,
TypeName = entry.JobType.AssemblyQualifiedName,
Cron = entry.Cron,
LastRun = DateTime.MinValue
});
}
return cronJobs.ToArray();
}
private ComputedCronJob[] Compute(IEnumerable<CronJob> jobs, CronJobRegistry.Entry[] entries)
=> jobs.Select(j => CreateComputedCronJob(j, entries)).ToArray();
private ComputedCronJob CreateComputedCronJob(CronJob job, CronJobRegistry.Entry[] entries)
{
var entry = entries.First(e => e.Name == job.Name);
return new ComputedCronJob(job, entry);
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Linq;
using DotNetCore.CAP.Job;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP
......@@ -13,16 +10,17 @@ namespace DotNetCore.CAP
private static readonly Action<ILogger, Exception> _serverShuttingDown;
private static readonly Action<ILogger, string, Exception> _expectedOperationCanceledException;
private static readonly Action<ILogger, Exception> _cronJobsNotFound;
private static readonly Action<ILogger, int, Exception> _cronJobsScheduling;
private static readonly Action<ILogger, string, double, Exception> _cronJobExecuted;
private static readonly Action<ILogger, string, Exception> _cronJobFailed;
private static readonly Action<ILogger, string, string, Exception> _enqueuingSentMessage;
private static readonly Action<ILogger, string, string, Exception> _enqueuingReceivdeMessage;
private static readonly Action<ILogger, string, Exception> _executingConsumerMethod;
private static readonly Action<ILogger, string, Exception> _receivedMessageRetryExecuting;
private static Action<ILogger, Exception> _jobFailed;
private static Action<ILogger, Exception> _jobFailedWillRetry;
private static Action<ILogger, double, Exception> _jobExecuted;
private static Action<ILogger, int, Exception> _jobRetrying;
private static Action<ILogger, string, Exception> _exceptionOccuredWhileExecutingJob;
static LoggerExtensions()
{
_serverStarting = LoggerMessage.Define<int, int>(
......@@ -45,26 +43,6 @@ namespace DotNetCore.CAP
3,
"Expected an OperationCanceledException, but found '{ExceptionMessage}'.");
_cronJobsNotFound = LoggerMessage.Define(
LogLevel.Debug,
1,
"No cron jobs found to schedule, cancelling processing of cron jobs.");
_cronJobsScheduling = LoggerMessage.Define<int>(
LogLevel.Debug,
2,
"Found {JobCount} cron job(s) to schedule.");
_cronJobExecuted = LoggerMessage.Define<string, double>(
LogLevel.Debug,
3,
"Cron job '{JobName}' executed succesfully. Took: {Seconds} secs.");
_cronJobFailed = LoggerMessage.Define<string>(
LogLevel.Warning,
4,
"Cron job '{jobName}' failed to execute.");
_enqueuingSentMessage = LoggerMessage.Define<string, string>(
LogLevel.Debug,
2,
......@@ -84,6 +62,52 @@ namespace DotNetCore.CAP
LogLevel.Error,
5,
"Received message topic method '{topicName}' failed to execute.");
_jobRetrying = LoggerMessage.Define<int>(
LogLevel.Debug,
3,
"Retrying a job: {Retries}...");
_jobExecuted = LoggerMessage.Define<double>(
LogLevel.Debug,
4,
"Job executed. Took: {Seconds} secs.");
_jobFailed = LoggerMessage.Define(
LogLevel.Warning,
1,
"Job failed to execute.");
_jobFailedWillRetry = LoggerMessage.Define(
LogLevel.Warning,
2,
"Job failed to execute. Will retry.");
_exceptionOccuredWhileExecutingJob = LoggerMessage.Define<string>(
LogLevel.Error,
6,
"An exception occured while trying to execute a job: '{JobId}'. " +
"Requeuing for another retry.");
}
public static void JobFailed(this ILogger logger, Exception ex)
{
_jobFailed(logger, ex);
}
public static void JobFailedWillRetry(this ILogger logger, Exception ex)
{
_jobFailedWillRetry(logger, ex);
}
public static void JobRetrying(this ILogger logger, int retries)
{
_jobRetrying(logger, retries, null);
}
public static void JobExecuted(this ILogger logger, double seconds)
{
_jobExecuted(logger, seconds, null);
}
public static void ConsumerMethodExecutingFailed(this ILogger logger, string methodName, Exception ex)
......@@ -126,24 +150,9 @@ namespace DotNetCore.CAP
_expectedOperationCanceledException(logger, ex.Message, ex);
}
public static void CronJobsNotFound(this ILogger logger)
{
_cronJobsNotFound(logger, null);
}
public static void CronJobsScheduling(this ILogger logger, IEnumerable<CronJob> jobs)
{
_cronJobsScheduling(logger, jobs.Count(), null);
}
public static void CronJobExecuted(this ILogger logger, string name, double seconds)
{
_cronJobExecuted(logger, name, seconds, null);
}
public static void CronJobFailed(this ILogger logger, string name, Exception ex)
public static void ExceptionOccuredWhileExecutingJob(this ILogger logger, string jobId, Exception ex)
{
_cronJobFailed(logger, name, ex);
_exceptionOccuredWhileExecutingJob(logger, jobId, ex);
}
}
}
\ No newline at end of file
namespace DotNetCore.CAP.Infrastructure
namespace DotNetCore.CAP
{
public class MessageContext
{
public string Group { get; set; }
public string KeyName { get; set; }
public string Name { get; set; }
public string Content { get; set; }
}
......
using System;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP.Infrastructure
namespace DotNetCore.CAP.Models
{
public class CapSentMessage
public class CapPublishedMessage
{
/// <summary>
/// Initializes a new instance of <see cref="CapSentMessage"/>.
/// Initializes a new instance of <see cref="CapPublishedMessage"/>.
/// </summary>
/// <remarks>
/// The Id property is initialized to from a new GUID string value.
/// </remarks>
public CapSentMessage()
public CapPublishedMessage()
{
Id = Guid.NewGuid().ToString();
Added = DateTime.Now;
}
public CapSentMessage(MessageContext message)
public CapPublishedMessage(MessageContext message)
{
KeyName = message.KeyName;
Name = message.Name;
Content = message.Content;
}
public string Id { get; set; }
public int Id { get; set; }
public string KeyName { get; set; }
public string Name { get; set; }
public string Content { get; set; }
public DateTime Added { get; set; }
public DateTime LastRun { get; set; }
public DateTime? ExpiresAt { get; set; }
public int Retries { get; set; }
......
namespace DotNetCore.CAP.Models
{
public class CapQueue
{
public int MessageId { get; set; }
/// <summary>
/// 0 is CapSentMessage, 1 is CapReceviedMessage
/// </summary>
public MessageType MessageType { get; set; }
}
}
\ No newline at end of file
using System;
using DotNetCore.CAP.Infrastructure;
namespace DotNetCore.CAP.Infrastructure
namespace DotNetCore.CAP.Models
{
public class CapReceivedMessage
{
......@@ -12,28 +13,27 @@ namespace DotNetCore.CAP.Infrastructure
/// </remarks>
public CapReceivedMessage()
{
Id = Guid.NewGuid().ToString();
Added = DateTime.Now;
}
public CapReceivedMessage(MessageContext message) : this()
{
Group = message.Group;
KeyName = message.KeyName;
Name = message.Name;
Content = message.Content;
}
public string Id { get; set; }
public int Id { get; set; }
public string Group { get; set; }
public string KeyName { get; set; }
public string Name { get; set; }
public string Content { get; set; }
public DateTime Added { get; set; }
public DateTime LastRun { get; set; }
public DateTime? ExpiresAt { get; set; }
public int Retries { get; set; }
......@@ -44,7 +44,7 @@ namespace DotNetCore.CAP.Infrastructure
return new MessageContext
{
Group = Group,
KeyName = KeyName,
Name = Name,
Content = Content
};
}
......
namespace DotNetCore.CAP.Models
{
public enum MessageType
{
Publish,
Subscribe
}
}
\ No newline at end of file
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Linq;
namespace DotNetCore.CAP
......@@ -18,6 +19,8 @@ namespace DotNetCore.CAP
/// </summary>
public bool Succeeded { get; set; }
public Exception Exception { get; set; }
/// <summary>
/// An <see cref="IEnumerable{T}"/> of <see cref="OperateError"/>s containing an errors
/// that occurred during the operation.
......@@ -29,7 +32,7 @@ namespace DotNetCore.CAP
/// Returns an <see cref="OperateResult"/> indicating a successful identity operation.
/// </summary>
/// <returns>An <see cref="OperateResult"/> indicating a successful operation.</returns>
public static OperateResult Success { get; } = new OperateResult {Succeeded = true};
public static OperateResult Success { get; } = new OperateResult { Succeeded = true };
/// <summary>
/// Creates an <see cref="OperateResult"/> indicating a failed operation, with a list of <paramref name="errors"/> if applicable.
......@@ -38,7 +41,18 @@ namespace DotNetCore.CAP
/// <returns>An <see cref="OperateResult"/> indicating a failed operation, with a list of <paramref name="errors"/> if applicable.</returns>
public static OperateResult Failed(params OperateError[] errors)
{
var result = new OperateResult {Succeeded = false};
var result = new OperateResult { Succeeded = false };
if (errors != null)
{
result._errors.AddRange(errors);
}
return result;
}
public static OperateResult Failed(Exception ex, params OperateError[] errors)
{
var result = new OperateResult { Succeeded = false };
result.Exception = ex;
if (errors != null)
{
result._errors.AddRange(errors);
......
namespace DotNetCore.CAP.Processor
{
public interface IAdditionalProcessor : IProcessor
{
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Job;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Kafka
namespace DotNetCore.CAP.Processor
{
public class KafkaJobProcessor : IJobProcessor
public class DefaultDispatcher : IDispatcher
{
private readonly KafkaOptions _kafkaOptions;
private readonly CancellationTokenSource _cts;
private readonly IQueueExecutorFactory _queueExecutorFactory;
private readonly IServiceProvider _provider;
private readonly ILogger _logger;
private readonly CancellationTokenSource _cts;
private readonly TimeSpan _pollingDelay;
public KafkaJobProcessor(
IOptions<CapOptions> capOptions,
IOptions<KafkaOptions> kafkaOptions,
ILogger<KafkaJobProcessor> logger,
IServiceProvider provider)
internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);
public DefaultDispatcher(
IServiceProvider provider,
IQueueExecutorFactory queueExecutorFactory,
IOptions<CapOptions> capOptions,
ILogger<DefaultDispatcher> logger)
{
_logger = logger;
_kafkaOptions = kafkaOptions.Value;
_queueExecutorFactory = queueExecutorFactory;
_provider = provider;
_cts = new CancellationTokenSource();
_pollingDelay = TimeSpan.FromSeconds(capOptions.Value.PollingDelay);
......@@ -41,9 +36,11 @@ namespace DotNetCore.CAP.Kafka
public Task ProcessAsync(ProcessingContext context)
{
if (context == null) throw new ArgumentNullException(nameof(context));
if (context == null)
throw new ArgumentNullException(nameof(context));
context.ThrowIfStopping();
return ProcessCoreAsync(context);
}
......@@ -60,10 +57,8 @@ namespace DotNetCore.CAP.Kafka
if (!worked)
{
var token = GetTokenToWaitOn(context);
await WaitHandleEx.WaitAnyAsync(PulseEvent, token.WaitHandle, _pollingDelay);
}
await WaitHandleEx.WaitAnyAsync(WaitHandleEx.PulseEvent,
context.CancellationToken.WaitHandle, _pollingDelay);
}
finally
{
......@@ -78,44 +73,22 @@ namespace DotNetCore.CAP.Kafka
private async Task<bool> Step(ProcessingContext context)
{
var fetched = default(IFetchedMessage);
using (var scopedContext = context.CreateScope())
{
var provider = scopedContext.Provider;
var messageStore = provider.GetRequiredService<ICapMessageStore>();
var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync();
if (message == null) return true;
try
{
var sp = Stopwatch.StartNew();
message.StatusName = StatusName.Processing;
await messageStore.UpdateSentMessageAsync(message);
await ExecuteJobAsync(message.KeyName, message.Content);
var connection = provider.GetRequiredService<IStorageConnection>();
sp.Stop();
message.StatusName = StatusName.Succeeded;
await messageStore.UpdateSentMessageAsync(message);
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
}
catch (Exception ex)
if ((fetched = await connection.FetchNextMessageAsync()) != null)
{
_logger.ExceptionOccuredWhileExecutingJob(message.KeyName, ex);
return false;
using (fetched)
{
var queueExecutor = _queueExecutorFactory.GetInstance(fetched.MessageType);
await queueExecutor.ExecuteAsync(connection, fetched);
}
}
}
return true;
}
private Task ExecuteJobAsync(string topic, string content)
{
var config = _kafkaOptions.AsRdkafkaConfig();
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{
producer.ProduceAsync(topic, null, content);
producer.Flush();
}
return Task.CompletedTask;
return fetched != null;
}
}
}
\ No newline at end of file
namespace DotNetCore.CAP.Processor
{
public interface IDispatcher : IProcessor
{
bool Waiting { get; }
}
}
\ No newline at end of file
......@@ -3,60 +3,67 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Job
namespace DotNetCore.CAP.Processor
{
public class JobProcessingServer : IProcessingServer, IDisposable
public class CapProcessingServer : IProcessingServer, IDisposable
{
private readonly ILogger _logger;
private readonly ILoggerFactory _loggerFactory;
private readonly IServiceProvider _provider;
private readonly CancellationTokenSource _cts;
private readonly CapOptions _options;
private readonly DefaultCronJobRegistry _defaultJobRegistry;
private IJobProcessor[] _processors;
private IProcessor[] _processors;
private IList<IDispatcher> _messageDispatchers;
private ProcessingContext _context;
private Task _compositeTask;
private bool _disposed;
public JobProcessingServer(
ILogger<JobProcessingServer> logger,
public CapProcessingServer(
ILogger<CapProcessingServer> logger,
ILoggerFactory loggerFactory,
IServiceProvider provider,
DefaultCronJobRegistry defaultJobRegistry,
IOptions<CapOptions> options)
{
_logger = logger;
_loggerFactory = loggerFactory;
_provider = provider;
_defaultJobRegistry = defaultJobRegistry;
_options = options.Value;
_cts = new CancellationTokenSource();
_messageDispatchers = new List<IDispatcher>();
}
public void Start()
{
var processorCount = Environment.ProcessorCount;
//processorCount = 1;
_processors = GetProcessors(processorCount);
_logger.ServerStarting(processorCount, processorCount);
_logger.ServerStarting(processorCount, _processors.Length);
_context = new ProcessingContext(
_provider,
_defaultJobRegistry,
_cts.Token);
_context = new ProcessingContext(_provider, _cts.Token);
var processorTasks = _processors
.Select(InfiniteRetry)
.Select(p => InfiniteRetry(p))
.Select(p => p.ProcessAsync(_context));
_compositeTask = Task.WhenAll(processorTasks);
}
public void Pulse()
{
if (!AllProcessorsWaiting())
{
// Some processor is still executing jobs so no need to pulse.
return;
}
_logger.LogTrace("Pulsing the Queuer.");
PublishQueuer.PulseEvent.Set();
}
public void Dispose()
{
if (_disposed)
......@@ -69,7 +76,7 @@ namespace DotNetCore.CAP.Job
_cts.Cancel();
try
{
_compositeTask.Wait((int) TimeSpan.FromSeconds(60).TotalMilliseconds);
_compositeTask.Wait((int)TimeSpan.FromSeconds(60).TotalMilliseconds);
}
catch (AggregateException ex)
{
......@@ -81,30 +88,37 @@ namespace DotNetCore.CAP.Job
}
}
private IJobProcessor InfiniteRetry(IJobProcessor inner)
private bool AllProcessorsWaiting()
{
foreach (var processor in _messageDispatchers)
{
if (!processor.Waiting)
{
return false;
}
}
return true;
}
private IProcessor InfiniteRetry(IProcessor inner)
{
return new InfiniteRetryProcessor(inner, _loggerFactory);
}
private IJobProcessor[] GetProcessors(int processorCount)
private IProcessor[] GetProcessors(int processorCount)
{
var returnedProcessors = new List<IJobProcessor>();
var returnedProcessors = new List<IProcessor>();
for (int i = 0; i < processorCount; i++)
{
var processors = _provider.GetServices<IJobProcessor>();
foreach (var processor in processors)
{
if (processor is CronJobProcessor)
{
if (i == 0) // only add first cronJob
returnedProcessors.Add(processor);
}
else
{
returnedProcessors.Add(processor);
}
}
var messageProcessors = _provider.GetRequiredService<IDispatcher>();
_messageDispatchers.Add(messageProcessors);
}
returnedProcessors.AddRange(_messageDispatchers);
returnedProcessors.Add(_provider.GetRequiredService<PublishQueuer>());
returnedProcessors.Add(_provider.GetRequiredService<SubscribeQueuer>());
returnedProcessors.Add(_provider.GetRequiredService<IAdditionalProcessor>());
return returnedProcessors.ToArray();
}
......
......@@ -2,15 +2,15 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Job
namespace DotNetCore.CAP.Processor
{
public class InfiniteRetryProcessor : IJobProcessor
public class InfiniteRetryProcessor : IProcessor
{
private readonly IJobProcessor _inner;
private readonly IProcessor _inner;
private readonly ILogger _logger;
public InfiniteRetryProcessor(
IJobProcessor inner,
IProcessor inner,
ILoggerFactory loggerFactory)
{
_inner = inner;
......@@ -33,10 +33,7 @@ namespace DotNetCore.CAP.Job
}
catch (Exception ex)
{
_logger.LogWarning(
1,
ex,
"Prcessor '{ProcessorName}' failed. Retrying...", _inner.ToString());
_logger.LogWarning(1, ex, "Prcessor '{ProcessorName}' failed. Retrying...", _inner.ToString());
}
}
}
......
using System;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Processor
{
public class PublishQueuer : IProcessor
{
private ILogger _logger;
private CapOptions _options;
private IStateChanger _stateChanger;
private IServiceProvider _provider;
private TimeSpan _pollingDelay;
public static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);
public PublishQueuer(
ILogger<PublishQueuer> logger,
IOptions<CapOptions> options,
IStateChanger stateChanger,
IServiceProvider provider)
{
_logger = logger;
_options = options.Value;
_stateChanger = stateChanger;
_provider = provider;
_pollingDelay = TimeSpan.FromSeconds(_options.PollingDelay);
}
public async Task ProcessAsync(ProcessingContext context)
{
using (var scope = _provider.CreateScope())
{
CapPublishedMessage sentMessage;
var provider = scope.ServiceProvider;
var connection = provider.GetRequiredService<IStorageConnection>();
while (
!context.IsStopping &&
(sentMessage = await connection.GetNextPublishedMessageToBeEnqueuedAsync()) != null)
{
var state = new EnqueuedState();
using (var transaction = connection.CreateTransaction())
{
_stateChanger.ChangeState(sentMessage, state, transaction);
await transaction.CommitAsync();
}
}
}
context.ThrowIfStopping();
DefaultDispatcher.PulseEvent.Set();
await WaitHandleEx.WaitAnyAsync(PulseEvent,
context.CancellationToken.WaitHandle, _pollingDelay);
}
}
}
\ No newline at end of file
using System;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Processor
{
public class SubscribeQueuer : IProcessor
{
private ILogger _logger;
private CapOptions _options;
private IStateChanger _stateChanger;
private IServiceProvider _provider;
private TimeSpan _pollingDelay;
internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);
public SubscribeQueuer(
ILogger<SubscribeQueuer> logger,
IOptions<CapOptions> options,
IStateChanger stateChanger,
IServiceProvider provider)
{
_logger = logger;
_options = options.Value;
_stateChanger = stateChanger;
_provider = provider;
_pollingDelay = TimeSpan.FromSeconds(_options.PollingDelay);
}
public async Task ProcessAsync(ProcessingContext context)
{
using (var scope = _provider.CreateScope())
{
CapReceivedMessage message;
var provider = scope.ServiceProvider;
var connection = provider.GetRequiredService<IStorageConnection>();
while (
!context.IsStopping &&
(message = await connection.GetNextReceviedMessageToBeEnqueuedAsync()) != null)
{
var state = new EnqueuedState();
using (var transaction = connection.CreateTransaction())
{
_stateChanger.ChangeState(message, state, transaction);
await transaction.CommitAsync();
}
}
}
context.ThrowIfStopping();
DefaultDispatcher.PulseEvent.Set();
await WaitHandleEx.WaitAnyAsync(PulseEvent,
context.CancellationToken.WaitHandle, _pollingDelay);
}
}
}
\ No newline at end of file
using System.Threading.Tasks;
namespace DotNetCore.CAP.Job
namespace DotNetCore.CAP.Processor
{
public interface IJobProcessor
public interface IProcessor
{
Task ProcessAsync(ProcessingContext context);
}
......
......@@ -3,7 +3,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.Job
namespace DotNetCore.CAP.Processor
{
public class ProcessingContext : IDisposable
{
......@@ -16,24 +16,19 @@ namespace DotNetCore.CAP.Job
private ProcessingContext(ProcessingContext other)
{
Provider = other.Provider;
CronJobRegistry = other.CronJobRegistry;
CancellationToken = other.CancellationToken;
}
public ProcessingContext(
IServiceProvider provider,
CronJobRegistry cronJobRegistry,
CancellationToken cancellationToken)
{
Provider = provider;
CronJobRegistry = cronJobRegistry;
CancellationToken = cancellationToken;
}
public IServiceProvider Provider { get; private set; }
public CronJobRegistry CronJobRegistry { get; private set; }
public CancellationToken CancellationToken { get; }
public bool IsStopping => CancellationToken.IsCancellationRequested;
......
using System;
namespace DotNetCore.CAP.Job
namespace DotNetCore.CAP.Processor
{
public class RetryBehavior
{
......@@ -18,7 +18,7 @@ namespace DotNetCore.CAP.Job
{
DefaultRetryCount = 25;
DefaultRetryInThunk = retries =>
(int) Math.Round(Math.Pow(retries - 1, 4) + 15 + (_random.Next(30) * (retries)));
(int)Math.Round(Math.Pow(retries - 1, 4) + 15 + (_random.Next(30) * (retries)));
DefaultRetry = new RetryBehavior(true);
NoRetry = new RetryBehavior(false);
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment