Commit aa597fca authored by yangxiaodong's avatar yangxiaodong

Migration project to DotNetCore group, modify the namespace and assembly name.

parent e56637f5
This source diff could not be displayed because it is too large. You can view the blob instead.

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26430.6
VisualStudioVersion = 15.0.26430.13
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{9B2AE124-6636-4DE9-83A3-70360DABD0C4}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{57A8A8E5-5715-41BF-A0A6-46B819933FBC}"
ProjectSection(SolutionItems) = preProject
CAP.vssettings = CAP.vssettings
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Shared", "Shared", "{9E5A7F49-8E31-4A71-90CC-1DA9AEDA99EE}"
ProjectSection(SolutionItems) = preProject
......@@ -18,21 +21,17 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Shared", "Shared", "{9E5A7F
{82A7F48D-3B50-4B1E-B82E-3ADA8210C358} = {82A7F48D-3B50-4B1E-B82E-3ADA8210C358}
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cap.Consistency", "src\Cap.Consistency\Cap.Consistency.csproj", "{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cap.Consistency.Test", "test\Cap.Consistency.Test\Cap.Consistency.Test.csproj", "{3A444CF8-1611-407F-8D32-5D0CDC3DD49D}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP", "src\DotNetCore.CAP\DotNetCore.CAP.csproj", "{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cap.Consistency.EntityFrameworkCore", "src\Cap.Consistency.EntityFrameworkCore\Cap.Consistency.EntityFrameworkCore.csproj", "{96111249-C4C3-4DC9-A887-32D583723AB1}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cap.Consistency.EntityFrameworkCore.Test", "test\Cap.Consistency.EntityFrameworkCore.Test\Cap.Consistency.EntityFrameworkCore.Test.csproj", "{7442C942-1DDC-40E4-8F1B-654E721EAA45}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.EntityFrameworkCore", "src\DotNetCore.CAP.EntityFrameworkCore\DotNetCore.CAP.EntityFrameworkCore.csproj", "{96111249-C4C3-4DC9-A887-32D583723AB1}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{3A6B6931-A123-477A-9469-8B468B5385AF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.Kafka", "samples\Sample.Kafka\Sample.Kafka.csproj", "{2F095ED9-5BC9-4512-9013-A47685FB2508}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cap.Consistency.Kafka", "src\Cap.Consistency.Kafka\Cap.Consistency.Kafka.csproj", "{C42CDE33-0878-4BA0-96F2-4CB7C8FDEAAD}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.Kafka", "src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj", "{C42CDE33-0878-4BA0-96F2-4CB7C8FDEAAD}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cap.Consistency.RabbitMQ", "src\Cap.Consistency.RabbitMQ\Cap.Consistency.RabbitMQ.csproj", "{9961B80E-0718-4280-B2A0-271B003DE26B}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.RabbitMQ", "src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj", "{9961B80E-0718-4280-B2A0-271B003DE26B}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{10C0818D-9160-4B80-BB86-DDE925B64D43}"
ProjectSection(SolutionItems) = preProject
......@@ -46,6 +45,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{10C0818D
build\version.props = build\version.props
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.EntityFrameworkCore.Test", "test\DotNetCore.CAP.EntityFrameworkCore.Test\DotNetCore.CAP.EntityFrameworkCore.Test.csproj", "{69370370-9873-4D6A-965D-D1E16694047D}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Test", "test\DotNetCore.CAP.Test\DotNetCore.CAP.Test.csproj", "{F608B509-A99B-4AC7-8227-42051DD4A578}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
......@@ -56,18 +59,10 @@ Global
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}.Release|Any CPU.Build.0 = Release|Any CPU
{3A444CF8-1611-407F-8D32-5D0CDC3DD49D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3A444CF8-1611-407F-8D32-5D0CDC3DD49D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3A444CF8-1611-407F-8D32-5D0CDC3DD49D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3A444CF8-1611-407F-8D32-5D0CDC3DD49D}.Release|Any CPU.Build.0 = Release|Any CPU
{96111249-C4C3-4DC9-A887-32D583723AB1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{96111249-C4C3-4DC9-A887-32D583723AB1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{96111249-C4C3-4DC9-A887-32D583723AB1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{96111249-C4C3-4DC9-A887-32D583723AB1}.Release|Any CPU.Build.0 = Release|Any CPU
{7442C942-1DDC-40E4-8F1B-654E721EAA45}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7442C942-1DDC-40E4-8F1B-654E721EAA45}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7442C942-1DDC-40E4-8F1B-654E721EAA45}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7442C942-1DDC-40E4-8F1B-654E721EAA45}.Release|Any CPU.Build.0 = Release|Any CPU
{2F095ED9-5BC9-4512-9013-A47685FB2508}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2F095ED9-5BC9-4512-9013-A47685FB2508}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2F095ED9-5BC9-4512-9013-A47685FB2508}.Release|Any CPU.ActiveCfg = Release|Any CPU
......@@ -80,6 +75,14 @@ Global
{9961B80E-0718-4280-B2A0-271B003DE26B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9961B80E-0718-4280-B2A0-271B003DE26B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9961B80E-0718-4280-B2A0-271B003DE26B}.Release|Any CPU.Build.0 = Release|Any CPU
{69370370-9873-4D6A-965D-D1E16694047D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{69370370-9873-4D6A-965D-D1E16694047D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{69370370-9873-4D6A-965D-D1E16694047D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{69370370-9873-4D6A-965D-D1E16694047D}.Release|Any CPU.Build.0 = Release|Any CPU
{F608B509-A99B-4AC7-8227-42051DD4A578}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F608B509-A99B-4AC7-8227-42051DD4A578}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F608B509-A99B-4AC7-8227-42051DD4A578}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F608B509-A99B-4AC7-8227-42051DD4A578}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......@@ -87,11 +90,11 @@ Global
GlobalSection(NestedProjects) = preSolution
{9E5A7F49-8E31-4A71-90CC-1DA9AEDA99EE} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{3A444CF8-1611-407F-8D32-5D0CDC3DD49D} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{96111249-C4C3-4DC9-A887-32D583723AB1} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{7442C942-1DDC-40E4-8F1B-654E721EAA45} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{2F095ED9-5BC9-4512-9013-A47685FB2508} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{C42CDE33-0878-4BA0-96F2-4CB7C8FDEAAD} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{9961B80E-0718-4280-B2A0-271B003DE26B} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{69370370-9873-4D6A-965D-D1E16694047D} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{F608B509-A99B-4AC7-8227-42051DD4A578} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
EndGlobalSection
EndGlobal
......@@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
using DotNetCore.CAP.Infrastructure;
using JetBrains.Annotations;
using Microsoft.EntityFrameworkCore;
......
using System;
using System.Threading.Tasks;
using Cap.Consistency;
using Cap.Consistency.Consumer;
using Cap.Consistency.Kafka;
using DotNetCore.CAP;
using DotNetCore.CAP.Kafka;
using Microsoft.AspNetCore.Mvc;
namespace Sample.Kafka.Controllers
......@@ -10,9 +9,9 @@ namespace Sample.Kafka.Controllers
[Route("api/[controller]")]
public class ValuesController : Controller, IConsumerService
{
private readonly IProducerClient _producer;
private readonly ICapProducerService _producer;
public ValuesController(IProducerClient producer) {
public ValuesController(ICapProducerService producer) {
_producer = producer;
}
......
......@@ -4,7 +4,7 @@ using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using Sample.Kafka;
using Cap.Consistency.Infrastructure;
using DotNetCore.CAP.Infrastructure;
namespace Sample.Kafka.Migrations
{
......@@ -18,7 +18,7 @@ namespace Sample.Kafka.Migrations
.HasAnnotation("ProductVersion", "1.1.2")
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
modelBuilder.Entity("Cap.Consistency.Infrastructure.ConsistencyMessage", b =>
modelBuilder.Entity("DotNetCore.CAP.Infrastructure.ConsistencyMessage", b =>
{
b.Property<string>("Id")
.ValueGeneratedOnAdd();
......
......@@ -4,7 +4,7 @@ using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using Sample.Kafka;
using Cap.Consistency.Infrastructure;
using DotNetCore.CAP.Infrastructure;
namespace Sample.Kafka.Migrations
{
......@@ -17,7 +17,7 @@ namespace Sample.Kafka.Migrations
.HasAnnotation("ProductVersion", "1.1.2")
.HasAnnotation("SqlServer:ValueGenerationStrategy", SqlServerValueGenerationStrategy.IdentityColumn);
modelBuilder.Entity("Cap.Consistency.Infrastructure.ConsistencyMessage", b =>
modelBuilder.Entity("DotNetCore.CAP.Infrastructure.ConsistencyMessage", b =>
{
b.Property<string>("Id")
.ValueGeneratedOnAdd();
......
......@@ -23,10 +23,10 @@
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="1.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Cap.Consistency.EntityFrameworkCore\Cap.Consistency.EntityFrameworkCore.csproj" />
<ProjectReference Include="..\..\src\Cap.Consistency.Kafka\Cap.Consistency.Kafka.csproj" />
<ProjectReference Include="..\..\src\Cap.Consistency.RabbitMQ\Cap.Consistency.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\Cap.Consistency\Cap.Consistency.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.EntityFrameworkCore\DotNetCore.CAP.EntityFrameworkCore.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</Project>
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
namespace Cap.Consistency
{
public interface IProducerClient
{
Task SendAsync(string topic, string content);
}
}
using System;
using Cap.Consistency.Infrastructure;
using DotNetCore.CAP.Infrastructure;
using Microsoft.EntityFrameworkCore;
namespace Cap.Consistency.EntityFrameworkCore
namespace DotNetCore.CAP.EntityFrameworkCore
{
/// <summary>
/// Base class for the Entity Framework database context used for consistency.
......@@ -33,8 +32,10 @@ namespace Cap.Consistency.EntityFrameworkCore
/// <param name="modelBuilder">
/// The builder being used to construct the model for this context.
/// </param>
protected override void OnModelCreating(ModelBuilder modelBuilder) {
modelBuilder.Entity<ConsistencyMessage>(b => {
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<ConsistencyMessage>(b =>
{
b.HasKey(m => m.Id);
b.ToTable("ConsistencyMessages");
});
......
using Cap.Consistency;
using Cap.Consistency.EntityFrameworkCore;
using DotNetCore.CAP;
using DotNetCore.CAP.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
namespace Microsoft.Extensions.DependencyInjection
{
/// <summary>
/// Contains extension methods to <see cref="ConsistencyBuilder"/> for adding entity framework stores.
/// Contains extension methods to <see cref="CapBuilder"/> for adding entity framework stores.
/// </summary>
public static class ConsistencyEntityFrameworkBuilderExtensions
{
......@@ -13,11 +13,12 @@ namespace Microsoft.Extensions.DependencyInjection
/// Adds an Entity Framework implementation of message stores.
/// </summary>
/// <typeparam name="TContext">The Entity Framework database context to use.</typeparam>
/// <param name="services">The <see cref="ConsistencyBuilder"/> instance this method extends.</param>
/// <returns>The <see cref="ConsistencyBuilder"/> instance this method extends.</returns>
public static ConsistencyBuilder AddEntityFrameworkStores<TContext>(this ConsistencyBuilder builder)
where TContext : DbContext {
builder.Services.AddScoped<IConsistencyMessageStore, ConsistencyMessageStore<TContext>>();
/// <param name="services">The <see cref="CapBuilder"/> instance this method extends.</param>
/// <returns>The <see cref="CapBuilder"/> instance this method extends.</returns>
public static CapBuilder AddEntityFrameworkStores<TContext>(this CapBuilder builder)
where TContext : DbContext
{
builder.Services.AddScoped<ICapMessageStore, ConsistencyMessageStore<TContext>>();
return builder;
}
......
......@@ -2,10 +2,10 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
using DotNetCore.CAP.Infrastructure;
using Microsoft.EntityFrameworkCore;
namespace Cap.Consistency.EntityFrameworkCore
namespace DotNetCore.CAP.EntityFrameworkCore
{
/// <summary>
/// Represents a new instance of a persistence store for the specified message types.
......@@ -13,7 +13,7 @@ namespace Cap.Consistency.EntityFrameworkCore
/// <typeparam name="ConsistencyMessage">The type representing a message.</typeparam>
/// <typeparam name="TContext">The type of the data context class used to access the store.</typeparam>
/// <typeparam name="TKey">The type of the primary key for a message.</typeparam>
public class ConsistencyMessageStore<TContext> : IConsistencyMessageStore where TContext : DbContext
public class ConsistencyMessageStore<TContext> : ICapMessageStore where TContext : DbContext
{
private bool _disposed;
......@@ -21,8 +21,10 @@ namespace Cap.Consistency.EntityFrameworkCore
/// Constructs a new instance of <see cref="ConsistencyMessageStore{ConsistencyMessage, TContext, TKey}"/>.
/// </summary>
/// <param name="context">The <see cref="DbContext"/>.</param>
public ConsistencyMessageStore(TContext context) {
if (context == null) {
public ConsistencyMessageStore(TContext context)
{
if (context == null)
{
throw new ArgumentNullException(nameof(context));
}
Context = context;
......@@ -38,10 +40,12 @@ namespace Cap.Consistency.EntityFrameworkCore
/// <param name="message">The message to create.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>The <see cref="Task"/> that represents the asynchronous operation, containing the <see cref="OperateResult"/> of the creation operation.</returns>
public async virtual Task<OperateResult> CreateAsync(ConsistencyMessage message, CancellationToken cancellationToken) {
public async virtual Task<OperateResult> CreateAsync(ConsistencyMessage message, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
ThrowIfDisposed();
if (message == null) {
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
Context.Add(message);
......@@ -55,18 +59,22 @@ namespace Cap.Consistency.EntityFrameworkCore
/// <param name="message">The message to delete.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>The <see cref="Task"/> that represents the asynchronous operation, containing the <see cref="OperateResult"/> of the update operation.</returns>
public async virtual Task<OperateResult> DeleteAsync(ConsistencyMessage message, CancellationToken cancellationToken) {
public async virtual Task<OperateResult> DeleteAsync(ConsistencyMessage message, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
ThrowIfDisposed();
if (message == null) {
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
Context.Remove(message);
try {
try
{
await SaveChanges(cancellationToken);
}
catch (DbUpdateConcurrencyException ex) {
catch (DbUpdateConcurrencyException ex)
{
return OperateResult.Failed(new OperateError() { Code = "DbUpdateConcurrencyException", Description = ex.Message });
}
return OperateResult.Success;
......@@ -80,7 +88,8 @@ namespace Cap.Consistency.EntityFrameworkCore
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the message matching the specified <paramref name="messageId"/> if it exists.
/// </returns>
public virtual Task<ConsistencyMessage> FindByIdAsync(string messageId, CancellationToken cancellationToken) {
public virtual Task<ConsistencyMessage> FindByIdAsync(string messageId, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
ThrowIfDisposed();
return MessageSet.FindAsync(new object[] { messageId }, cancellationToken);
......@@ -92,10 +101,12 @@ namespace Cap.Consistency.EntityFrameworkCore
/// <param name="message">The message whose identifier should be retrieved.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>The <see cref="Task"/> that represents the asynchronous operation, containing the identifier for the specified <paramref name="message"/>.</returns>
public Task<string> GeConsistencyMessageIdAsync(ConsistencyMessage message, CancellationToken cancellationToken) {
public Task<string> GeConsistencyMessageIdAsync(ConsistencyMessage message, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
ThrowIfDisposed();
if (message == null) {
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
return Task.FromResult(message.Id);
......@@ -107,26 +118,31 @@ namespace Cap.Consistency.EntityFrameworkCore
/// <param name="message">The message to update.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>The <see cref="Task"/> that represents the asynchronous operation, containing the <see cref="OperateResult"/> of the update operation.</returns>
public async virtual Task<OperateResult> UpdateAsync(ConsistencyMessage message, CancellationToken cancellationToken) {
public async virtual Task<OperateResult> UpdateAsync(ConsistencyMessage message, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
ThrowIfDisposed();
if (message == null) {
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
Context.Attach(message);
message.UpdateTime = DateTime.Now;
Context.Update(message);
try {
try
{
await SaveChanges(cancellationToken);
}
catch (DbUpdateConcurrencyException ex) {
catch (DbUpdateConcurrencyException ex)
{
return OperateResult.Failed(new OperateError() { Code = "DbUpdateConcurrencyException", Description = ex.Message });
}
return OperateResult.Success;
}
public Task<ConsistencyMessage> GetFirstEnqueuedMessageAsync(CancellationToken cancellationToken) {
public Task<ConsistencyMessage> GetFirstEnqueuedMessageAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
return MessageSet.AsNoTracking().Where(x => x.Status == MessageStatus.WaitForSend).FirstOrDefaultAsync(cancellationToken);
}
......@@ -155,15 +171,18 @@ namespace Cap.Consistency.EntityFrameworkCore
/// <summary>Saves the current store.</summary>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>The <see cref="Task"/> that represents the asynchronous operation.</returns>
protected Task SaveChanges(CancellationToken cancellationToken) {
protected Task SaveChanges(CancellationToken cancellationToken)
{
return AutoSaveChanges ? Context.SaveChangesAsync(cancellationToken) : Task.CompletedTask;
}
/// <summary>
/// Throws if this class has been disposed.
/// </summary>
protected void ThrowIfDisposed() {
if (_disposed) {
protected void ThrowIfDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().Name);
}
}
......@@ -171,7 +190,8 @@ namespace Cap.Consistency.EntityFrameworkCore
/// <summary>
/// Dispose the store
/// </summary>
public void Dispose() {
public void Dispose()
{
_disposed = true;
}
}
......
......@@ -2,8 +2,8 @@
<PropertyGroup>
<TargetFramework>netstandard1.6</TargetFramework>
<AssemblyName>Cap.Consistency.EntityFrameworkCore</AssemblyName>
<PackageId>Cap.Consistency.EntityFrameworkCore</PackageId>
<AssemblyName>DotNetCore.CAP.EntityFrameworkCore</AssemblyName>
<PackageId>DotNetCore.CAP.EntityFrameworkCore</PackageId>
<NetStandardImplicitPackageVersion>1.6.1</NetStandardImplicitPackageVersion>
<PackageTargetFallback>$(PackageTargetFallback);dnxcore50</PackageTargetFallback>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
......@@ -11,14 +11,14 @@
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Cap.Consistency\Cap.Consistency.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.2" />
<PackageReference Include="System.ComponentModel.TypeConverter" Version="4.3.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</Project>
......@@ -3,6 +3,12 @@
<PropertyGroup>
<TargetFramework>netstandard1.6</TargetFramework>
<NetStandardImplicitPackageVersion>1.6.1</NetStandardImplicitPackageVersion>
<AssemblyName>DotNetCore.CAP.Kafka</AssemblyName>
<PackageId>DotNetCore.CAP.Kafka</PackageId>
<PackageTargetFallback>$(PackageTargetFallback);dnxcore50</PackageTargetFallback>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
</PropertyGroup>
<ItemGroup>
......@@ -10,7 +16,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Cap.Consistency\Cap.Consistency.csproj" />
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</Project>
\ No newline at end of file
......@@ -4,19 +4,18 @@ using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Job;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Job;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Cap.Consistency.Kafka
namespace DotNetCore.CAP.Kafka
{
public class KafkaJobProcessor : IJobProcessor
{
private readonly ConsistencyOptions _options;
private readonly CancellationTokenSource _cts;
......@@ -29,8 +28,8 @@ namespace Cap.Consistency.Kafka
public KafkaJobProcessor(
IOptions<ConsistencyOptions> options,
ILogger<KafkaJobProcessor> logger,
IServiceProvider provider) {
IServiceProvider provider)
{
_logger = logger;
_options = options.Value;
_provider = provider;
......@@ -40,45 +39,53 @@ namespace Cap.Consistency.Kafka
public bool Waiting { get; private set; }
public Task ProcessAsync(ProcessingContext context) {
public Task ProcessAsync(ProcessingContext context)
{
if (context == null) throw new ArgumentNullException(nameof(context));
context.ThrowIfStopping();
return ProcessCoreAsync(context);
}
public async Task ProcessCoreAsync(ProcessingContext context) {
try {
public async Task ProcessCoreAsync(ProcessingContext context)
{
try
{
var worked = await Step(context);
context.ThrowIfStopping();
Waiting = true;
if (!worked) {
if (!worked)
{
var token = GetTokenToWaitOn(context);
}
await WaitHandleEx.WaitAnyAsync(WaitHandleEx.PulseEvent, context.CancellationToken.WaitHandle, _pollingDelay);
}
finally {
finally
{
Waiting = false;
}
}
protected virtual CancellationToken GetTokenToWaitOn(ProcessingContext context) {
protected virtual CancellationToken GetTokenToWaitOn(ProcessingContext context)
{
return context.CancellationToken;
}
private async Task<bool> Step(ProcessingContext context) {
using (var scopedContext = context.CreateScope()) {
private async Task<bool> Step(ProcessingContext context)
{
using (var scopedContext = context.CreateScope())
{
var provider = scopedContext.Provider;
var messageStore = provider.GetRequiredService<IConsistencyMessageStore>();
try {
var messageStore = provider.GetRequiredService<ICapMessageStore>();
try
{
var message = await messageStore.GetFirstEnqueuedMessageAsync(_cts.Token);
if (message != null) {
if (message != null)
{
var sp = Stopwatch.StartNew();
message.Status = MessageStatus.Processing;
await messageStore.UpdateAsync(message, _cts.Token);
......@@ -87,10 +94,12 @@ namespace Cap.Consistency.Kafka
sp.Stop();
if (!jobResult) {
if (!jobResult)
{
_logger.JobFailed(new Exception("topic send failed"));
}
else {
else
{
message.Status = MessageStatus.Successed;
await messageStore.UpdateAsync(message, _cts.Token);
//await messageStore.DeleteAsync(message, _cts.Token);
......@@ -98,31 +107,37 @@ namespace Cap.Consistency.Kafka
}
}
}
catch (Exception ex) {
catch (Exception )
{
return false;
}
}
return true;
}
private bool ExecuteJob(string topic, string content) {
try {
private bool ExecuteJob(string topic, string content)
{
try
{
var config = new Dictionary<string, object> { { "bootstrap.servers", _options.BrokerUrlList } };
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8))) {
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{
var message = producer.ProduceAsync(topic, null, content).Result;
if (message.Error.Code == ErrorCode.NoError) {
if (message.Error.Code == ErrorCode.NoError)
{
return true;
}
else {
else
{
return false;
}
}
}
catch (Exception ex) {
catch (Exception ex)
{
_logger.ExceptionOccuredWhileExecutingJob(topic, ex);
return false;
}
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Consumer;
using Cap.Consistency.Infrastructure;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using DotNetCore.CAP.Infrastructure;
namespace Cap.Consistency.Kafka
namespace DotNetCore.CAP.Kafka
{
public class KafkaConsumerClient : IConsumerClient
{
......@@ -19,38 +18,46 @@ namespace Cap.Consistency.Kafka
public IDeserializer<string> StringDeserializer { get; set; }
public KafkaConsumerClient(string groupId, string bootstrapServers) {
public KafkaConsumerClient(string groupId, string bootstrapServers)
{
_groupId = groupId;
_bootstrapServers = bootstrapServers;
StringDeserializer = new StringDeserializer(Encoding.UTF8);
}
public void Subscribe(string topic) {
public void Subscribe(string topic)
{
Subscribe(topic, 0);
}
public void Subscribe(string topicName, int partition) {
if (_consumerClient == null) {
public void Subscribe(string topicName, int partition)
{
if (_consumerClient == null)
{
InitKafkaClient();
}
_consumerClient.Assignment.Add(new TopicPartition(topicName, partition));
_consumerClient.Subscribe(topicName);
}
public void Listening(TimeSpan timeout) {
while (true) {
public void Listening(TimeSpan timeout)
{
while (true)
{
_consumerClient.Poll(timeout);
}
}
public void Dispose() {
public void Dispose()
{
_consumerClient.Dispose();
}
#region private methods
private void InitKafkaClient() {
private void InitKafkaClient()
{
var config = new Dictionary<string, object>{
{ "group.id", _groupId },
{ "bootstrap.servers", _bootstrapServers }
......@@ -60,8 +67,10 @@ namespace Cap.Consistency.Kafka
_consumerClient.OnMessage += ConsumerClient_OnMessage;
}
private void ConsumerClient_OnMessage(object sender, Message<Null, string> e) {
var message = new DeliverMessage {
private void ConsumerClient_OnMessage(object sender, Message<Null, string> e)
{
var message = new DeliverMessage
{
MessageKey = e.Topic,
Value = e.Value,
Body = Encoding.UTF8.GetBytes(e.Value)
......
using Cap.Consistency.Consumer;
namespace Cap.Consistency.Kafka
namespace DotNetCore.CAP.Kafka
{
public class KafkaConsumerClientFactory : IConsumerClientFactory
{
public IConsumerClient Create(string groupId, string clientHostAddress) {
public IConsumerClient Create(string groupId, string clientHostAddress)
{
return new KafkaConsumerClient(groupId, clientHostAddress);
}
}
......
using Cap.Consistency.Abstractions;
using DotNetCore.CAP.Abstractions;
namespace Cap.Consistency.Kafka
namespace DotNetCore.CAP.Kafka
{
public class KafkaTopicAttribute : TopicAttribute
{
......
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.Extensions.Logging;
namespace Cap.Consistency.Kafka
namespace DotNetCore.CAP.Kafka
{
internal static class LoggerExtensions
{
......@@ -20,7 +18,8 @@ namespace Cap.Consistency.Kafka
private static Action<ILogger, int, Exception> _jobCouldNotBeLoaded;
private static Action<ILogger, string, Exception> _exceptionOccuredWhileExecutingJob;
static LoggerExtensions() {
static LoggerExtensions()
{
_collectingExpiredEntities = LoggerMessage.Define(
LogLevel.Debug,
1,
......@@ -73,44 +72,53 @@ namespace Cap.Consistency.Kafka
"Requeuing for another retry.");
}
public static void CollectingExpiredEntities(this ILogger logger) {
public static void CollectingExpiredEntities(this ILogger logger)
{
_collectingExpiredEntities(logger, null);
}
public static void Installing(this ILogger logger) {
public static void Installing(this ILogger logger)
{
_installing(logger, null);
}
public static void InstallingError(this ILogger logger, Exception ex) {
public static void InstallingError(this ILogger logger, Exception ex)
{
_installingError(logger, ex);
}
public static void InstallingSuccess(this ILogger logger) {
public static void InstallingSuccess(this ILogger logger)
{
_installingSuccess(logger, null);
}
public static void JobFailed(this ILogger logger, Exception ex) {
public static void JobFailed(this ILogger logger, Exception ex)
{
_jobFailed(logger, ex);
}
public static void JobFailedWillRetry(this ILogger logger, Exception ex) {
public static void JobFailedWillRetry(this ILogger logger, Exception ex)
{
_jobFailedWillRetry(logger, ex);
}
public static void JobRetrying(this ILogger logger, int retries) {
public static void JobRetrying(this ILogger logger, int retries)
{
_jobRetrying(logger, retries, null);
}
public static void JobExecuted(this ILogger logger, double seconds) {
public static void JobExecuted(this ILogger logger, double seconds)
{
_jobExecuted(logger, seconds, null);
}
public static void JobCouldNotBeLoaded(this ILogger logger, int jobId, Exception ex) {
public static void JobCouldNotBeLoaded(this ILogger logger, int jobId, Exception ex)
{
_jobCouldNotBeLoaded(logger, jobId, ex);
}
public static void ExceptionOccuredWhileExecutingJob(this ILogger logger, string jobId, Exception ex) {
public static void ExceptionOccuredWhileExecutingJob(this ILogger logger, string jobId, Exception ex)
{
_exceptionOccuredWhileExecutingJob(logger, jobId, ex);
}
}
......
using Cap.Consistency.Consumer;
using Cap.Consistency.Job;
using Cap.Consistency.Kafka;
using DotNetCore.CAP;
using DotNetCore.CAP.Job;
using DotNetCore.CAP.Kafka;
namespace Microsoft.Extensions.DependencyInjection
{
public static class ConsistencyBuilderExtensions
{
public static ConsistencyBuilder AddKafka(this ConsistencyBuilder builder) {
public static CapBuilder AddKafka(this CapBuilder builder)
{
builder.Services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();
builder.Services.AddTransient<IJobProcessor, KafkaJobProcessor>();
......
......@@ -10,7 +10,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Cap.Consistency\Cap.Consistency.csproj" />
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
<ItemGroup>
......
using System;
using System.Text;
using Cap.Consistency.Consumer;
using Cap.Consistency.Infrastructure;
using DotNetCore.CAP.Infrastructure;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Cap.Consistency.RabbitMQ
namespace DotNetCore.CAP.RabbitMQ
{
public class RabbitMQConsumerClient : IConsumerClient
{
......@@ -22,14 +21,16 @@ namespace Cap.Consistency.RabbitMQ
public event EventHandler<DeliverMessage> MessageReceieved;
public RabbitMQConsumerClient(string exchange, string hostName) {
public RabbitMQConsumerClient(string exchange, string hostName)
{
_exchange = exchange;
_hostName = hostName;
InitClient();
}
private void InitClient() {
private void InitClient()
{
_connectionFactory = new ConnectionFactory { HostName = _hostName };
_connection = _connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
......@@ -37,7 +38,8 @@ namespace Cap.Consistency.RabbitMQ
_queueName = _channel.QueueDeclare().QueueName;
}
public void Listening(TimeSpan timeout) {
public void Listening(TimeSpan timeout)
{
// Task.Delay(timeout).Wait();
var consumer = new EventingBasicConsumer(_channel);
......@@ -45,21 +47,26 @@ namespace Cap.Consistency.RabbitMQ
_channel.BasicConsume(_queueName, true, consumer);
}
public void Subscribe(string topic) {
public void Subscribe(string topic)
{
_channel.QueueBind(_queueName, _exchange, topic);
}
public void Subscribe(string topic, int partition) {
public void Subscribe(string topic, int partition)
{
_channel.QueueBind(_queueName, _exchange, topic);
}
public void Dispose() {
public void Dispose()
{
_channel.Dispose();
_connection.Dispose();
}
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e) {
var message = new DeliverMessage {
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e)
{
var message = new DeliverMessage
{
MessageKey = e.RoutingKey,
Body = e.Body,
Value = Encoding.UTF8.GetString(e.Body)
......
using Cap.Consistency.Consumer;
namespace Cap.Consistency.RabbitMQ
namespace DotNetCore.CAP.RabbitMQ
{
public class RabbitMQConsumerClientFactory : IConsumerClientFactory
{
public IConsumerClient Create(string groupId, string clientHostAddress) {
public IConsumerClient Create(string groupId, string clientHostAddress)
{
return new RabbitMQConsumerClient(groupId, clientHostAddress);
}
}
......
using System.Text;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace Cap.Consistency.RabbitMQ
namespace DotNetCore.CAP.RabbitMQ
{
public class RabbitMQProducerClient : IProducerClient
public class RabbitMQProducerClient : ICapProducerService
{
private readonly ConsistencyOptions _options;
private readonly ILogger _logger;
public RabbitMQProducerClient(IOptions<ConsistencyOptions> options, ILoggerFactory loggerFactory) {
public RabbitMQProducerClient(IOptions<ConsistencyOptions> options, ILoggerFactory loggerFactory)
{
_options = options.Value;
_logger = loggerFactory.CreateLogger(nameof(RabbitMQProducerClient));
}
public Task SendAsync(string topic, string content) {
public Task SendAsync(string topic, string content)
{
var factory = new ConnectionFactory() { HostName = _options.BrokerUrlList };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel()) {
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "topic_logs",
type: "topic");
......
using System;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Abstractions;
using DotNetCore.CAP.Abstractions;
namespace Cap.Consistency.RabbitMQ
namespace DotNetCore.CAP.RabbitMQ
{
public class RabbitMQTopicAttribute : TopicAttribute
{
public RabbitMQTopicAttribute(string routingKey) : base(routingKey) {
public RabbitMQTopicAttribute(string routingKey) : base(routingKey)
{
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
using DotNetCore.CAP.Infrastructure;
namespace Cap.Consistency.Abstractions
namespace DotNetCore.CAP.Abstractions
{
public class ConsumerContext
{
public ConsumerContext(ConsumerExecutorDescriptor descriptor, DeliverMessage message) {
public ConsumerContext(ConsumerExecutorDescriptor descriptor, DeliverMessage message)
{
ConsumerDescriptor = descriptor ?? throw new ArgumentNullException(nameof(descriptor));
DeliverMessage = message ?? throw new ArgumentNullException(nameof(message));
}
......
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
using System.Reflection;
namespace Cap.Consistency.Abstractions
namespace DotNetCore.CAP.Abstractions
{
public class ConsumerExecutorDescriptor
{
......
using System;
using System.Collections.Generic;
using System.Text;
namespace Cap.Consistency.Abstractions
namespace DotNetCore.CAP.Abstractions
{
public class ConsumerInvokerContext
{
public ConsumerInvokerContext(ConsumerContext consumerContext) {
public ConsumerInvokerContext(ConsumerContext consumerContext)
{
ConsumerContext = consumerContext ??
throw new ArgumentNullException(nameof(consumerContext));
}
public ConsumerContext ConsumerContext { get; set; }
public IConsumerInvoker Result { get; set; }
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks;
namespace Cap.Consistency.Abstractions
namespace DotNetCore.CAP.Abstractions
{
public interface IConsumerInvoker
{
......
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks;
namespace Cap.Consistency.Abstractions.ModelBinding
namespace DotNetCore.CAP.Abstractions.ModelBinding
{
public interface IModelBinder
{
......
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.Extensions.Primitives;
namespace Cap.Consistency.Abstractions.ModelBinding
namespace DotNetCore.CAP.Abstractions.ModelBinding
{
public class ModelBindingContext
{
......@@ -17,8 +15,10 @@ namespace Cap.Consistency.Abstractions.ModelBinding
public object Result { get; set; }
public static ModelBindingContext CreateBindingContext(string values, string modelName, Type modelType) {
return new ModelBindingContext() {
public static ModelBindingContext CreateBindingContext(string values, string modelName, Type modelType)
{
return new ModelBindingContext()
{
ModelName = modelName,
ModelType = modelType,
Values = values
......
using System;
using System.Collections.Generic;
using System.Text;
namespace Cap.Consistency.Abstractions
namespace DotNetCore.CAP.Abstractions
{
[AttributeUsage(AttributeTargets.Method | AttributeTargets.Class, Inherited = true, AllowMultiple = true)]
public abstract class TopicAttribute : Attribute
{
readonly string _name;
private readonly string _name;
public TopicAttribute(string topicName) {
public TopicAttribute(string topicName)
{
this._name = topicName;
}
public string Name {
public string Name
{
get { return _name; }
}
......
using System;
using Cap.Consistency;
using DotNetCore.CAP;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
namespace Microsoft.AspNetCore.Builder
{
/// <summary>
/// Consistence extensions for <see cref="IApplicationBuilder"/>
/// </summary>
public static class BuilderExtensions
public static class AppBuilderExtensions
{
///<summary>
/// Enables Consistence for the current application
/// </summary>
/// <param name="app">The <see cref="IApplicationBuilder"/> instance this method extends.</param>
/// <returns>The <see cref="IApplicationBuilder"/> instance this method extends.</returns>
public static IApplicationBuilder UseConsistency(this IApplicationBuilder app) {
if (app == null) {
public static IApplicationBuilder UseConsistency(this IApplicationBuilder app)
{
if (app == null)
{
throw new ArgumentNullException(nameof(app));
}
var marker = app.ApplicationServices.GetService<ConsistencyMarkerService>();
var marker = app.ApplicationServices.GetService<CapMarkerService>();
if (marker == null) {
if (marker == null)
{
throw new InvalidOperationException("Add Consistency must be called on the service collection.");
}
......
using System;
using Cap.Consistency;
using Cap.Consistency.Job;
using DotNetCore.CAP;
using DotNetCore.CAP.Job;
namespace Microsoft.Extensions.DependencyInjection
{
/// <summary>
/// Used to verify Consistency service was called on a ServiceCollection
/// </summary>
public class ConsistencyMarkerService { }
public class CapMarkerService { }
public class ConsistencyBuilder
public class CapBuilder
{
public CapBuilder(IServiceCollection services)
{
public ConsistencyBuilder(IServiceCollection services) {
Services = services;
}
public IServiceCollection Services { get; private set; }
private ConsistencyBuilder AddScoped(Type serviceType, Type concreteType) {
private CapBuilder AddScoped(Type serviceType, Type concreteType)
{
Services.AddScoped(serviceType, concreteType);
return this;
}
private ConsistencyBuilder AddSingleton<TService, TImplementation>()
private CapBuilder AddSingleton<TService, TImplementation>()
where TService : class
where TImplementation : class, TService {
where TImplementation : class, TService
{
Services.AddSingleton<TService, TImplementation>();
return this;
}
/// <summary>
/// Adds an <see cref="IConsistencyMessageStore"/> .
/// Adds an <see cref="ICapMessageStore"/> .
/// </summary>
/// <typeparam name="T">The type for the <see cref="IConsistencyMessageStore"/> to add. </typeparam>
/// <returns>The current <see cref="ConsistencyBuilder"/> instance.</returns>
public virtual ConsistencyBuilder AddMessageStore<T>()
where T : class, IConsistencyMessageStore {
return AddScoped(typeof(IConsistencyMessageStore), typeof(T));
/// <typeparam name="T">The type for the <see cref="ICapMessageStore"/> to add. </typeparam>
/// <returns>The current <see cref="CapBuilder"/> instance.</returns>
public virtual CapBuilder AddMessageStore<T>()
where T : class, ICapMessageStore
{
return AddScoped(typeof(ICapMessageStore), typeof(T));
}
public virtual ConsistencyBuilder AddJobs<T>()
where T : class, IJob {
public virtual CapBuilder AddJobs<T>()
where T : class, IJob
{
return AddSingleton<IJob, T>();
}
public virtual ConsistencyBuilder AddProducerClient<T>()
where T:class, IProducerClient {
return AddScoped(typeof(IProducerClient), typeof(T));
public virtual CapBuilder AddProducerClient<T>()
where T : class, ICapProducerService
{
return AddScoped(typeof(ICapProducerService), typeof(T));
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Reflection;
using Cap.Consistency;
using Cap.Consistency.Abstractions.ModelBinding;
using Cap.Consistency.Consumer;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Internal;
using Cap.Consistency.Job;
using DotNetCore.CAP;
using DotNetCore.CAP.Abstractions.ModelBinding;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Job;
using Microsoft.Extensions.DependencyInjection.Extensions;
namespace Microsoft.Extensions.DependencyInjection
......@@ -20,11 +19,12 @@ namespace Microsoft.Extensions.DependencyInjection
/// Adds and configures the consistence services for the consitence.
/// </summary>
/// <param name="services">The services available in the application.</param>
/// <returns>An <see cref="ConsistencyBuilder"/> for application services.</returns>
public static ConsistencyBuilder AddConsistency(this IServiceCollection services) {
/// <returns>An <see cref="CapBuilder"/> for application services.</returns>
public static CapBuilder AddConsistency(this IServiceCollection services)
{
services.AddConsistency(x => new ConsistencyOptions());
return new ConsistencyBuilder(services);
return new CapBuilder(services);
}
/// <summary>
......@@ -32,12 +32,12 @@ namespace Microsoft.Extensions.DependencyInjection
/// </summary>
/// <param name="services">The services available in the application.</param>
/// <param name="setupAction">An action to configure the <see cref="ConsistencyOptions"/>.</param>
/// <returns>An <see cref="ConsistencyBuilder"/> for application services.</returns>
public static ConsistencyBuilder AddConsistency(
/// <returns>An <see cref="CapBuilder"/> for application services.</returns>
public static CapBuilder AddConsistency(
this IServiceCollection services,
Action<ConsistencyOptions> setupAction) {
services.TryAddSingleton<ConsistencyMarkerService>();
Action<ConsistencyOptions> setupAction)
{
services.TryAddSingleton<CapMarkerService>();
services.Configure(setupAction);
AddConsumerServices(services);
......@@ -55,27 +55,32 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton<IJob, CapJob>();
services.TryAddTransient<DefaultCronJobRegistry>();
services.TryAddScoped<IProducerClient, DefaultProducerClient>();
services.TryAddScoped<ICapProducerService, DefaultProducerService>();
return new ConsistencyBuilder(services);
return new CapBuilder(services);
}
private static void AddConsumerServices(IServiceCollection services) {
private static void AddConsumerServices(IServiceCollection services)
{
var consumerListenerServices = new Dictionary<Type, Type>();
foreach (var rejectedServices in services) {
foreach (var rejectedServices in services)
{
if (rejectedServices.ImplementationType != null
&& typeof(IConsumerService).IsAssignableFrom(rejectedServices.ImplementationType))
consumerListenerServices.Add(typeof(IConsumerService), rejectedServices.ImplementationType);
}
foreach (var service in consumerListenerServices) {
foreach (var service in consumerListenerServices)
{
services.AddSingleton(service.Key, service.Value);
}
var types = Assembly.GetEntryAssembly().ExportedTypes;
foreach (var type in types) {
if (typeof(IConsumerService).IsAssignableFrom(type)) {
foreach (var type in types)
{
if (typeof(IConsumerService).IsAssignableFrom(type))
{
services.AddSingleton(typeof(IConsumerService), type);
}
}
......
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<PackageId>Cap.Consistency</PackageId>
<PackageId>DotNetCore.CAP</PackageId>
<TargetFramework>netstandard1.6</TargetFramework>
<NetStandardImplicitPackageVersion>1.6.1</NetStandardImplicitPackageVersion>
<PackageTargetFallback>$(PackageTargetFallback);dnxcore50</PackageTargetFallback>
......
using System;
using System.Linq;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
using DotNetCore.CAP.Infrastructure;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
namespace Cap.Consistency
namespace DotNetCore.CAP
{
public class DefaultBootstrapper : IBootstrapper
{
......@@ -19,39 +18,44 @@ namespace Cap.Consistency
public DefaultBootstrapper(
IOptions<ConsistencyOptions> options,
IConsistencyMessageStore storage,
ICapMessageStore storage,
IApplicationLifetime appLifetime,
IServiceProvider provider) {
IServiceProvider provider)
{
Options = options.Value;
Storage = storage;
_appLifetime = appLifetime;
Provider = provider;
Servers = Provider.GetServices<IProcessingServer>();
_cts = new CancellationTokenSource();
_ctsRegistration = appLifetime.ApplicationStopping.Register(() => {
_ctsRegistration = appLifetime.ApplicationStopping.Register(() =>
{
_cts.Cancel();
try {
try
{
_bootstrappingTask?.Wait();
}
catch (OperationCanceledException) {
catch (OperationCanceledException)
{
}
});
}
protected ConsistencyOptions Options { get; }
protected IConsistencyMessageStore Storage { get; }
protected ICapMessageStore Storage { get; }
protected IEnumerable<IProcessingServer> Servers { get; }
public IServiceProvider Provider { get; private set; }
public Task BootstrapAsync() {
public Task BootstrapAsync()
{
return (_bootstrappingTask = BootstrapTaskAsync());
}
private async Task BootstrapTaskAsync() {
private async Task BootstrapTaskAsync()
{
if (_cts.IsCancellationRequested) return;
if (_cts.IsCancellationRequested) return;
......@@ -60,11 +64,14 @@ namespace Cap.Consistency
if (_cts.IsCancellationRequested) return;
foreach (var item in Servers) {
try {
foreach (var item in Servers)
{
try
{
item.Start();
}
catch (Exception) {
catch (Exception)
{
}
}
......@@ -72,9 +79,12 @@ namespace Cap.Consistency
_cts.Dispose();
}
public virtual Task BootstrapCoreAsync() {
_appLifetime.ApplicationStopping.Register(() => {
foreach (var item in Servers) {
public virtual Task BootstrapCoreAsync()
{
_appLifetime.ApplicationStopping.Register(() =>
{
foreach (var item in Servers)
{
item.Dispose();
}
});
......
using System.Threading.Tasks;
namespace Cap.Consistency
namespace DotNetCore.CAP
{
/// <summary>
/// Represents bootstrapping logic. For example, adding initial state to the storage or querying certain entities.
......
using System;
using System.Threading;
using System.Threading;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
using DotNetCore.CAP.Infrastructure;
namespace Cap.Consistency
namespace DotNetCore.CAP
{
/// <summary>
/// Provides an abstraction for a store which manages consistent message.
/// </summary>
/// <typeparam name="ConsistencyMessage"></typeparam>
public interface IConsistencyMessageStore
public interface ICapMessageStore
{
/// <summary>
/// Finds and returns a message, if any, who has the specified <paramref name="messageId"/>.
......@@ -21,7 +20,6 @@ namespace Cap.Consistency
/// </returns>
Task<ConsistencyMessage> FindByIdAsync(string messageId, CancellationToken cancellationToken);
/// <summary>
/// Creates a new message in a store as an asynchronous operation.
/// </summary>
......
using System;
using System.Threading;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.Logging;
namespace Cap.Consistency
namespace DotNetCore.CAP
{
public class DefaultProducerClient : IProducerClient
public class DefaultProducerService : ICapProducerService
{
private readonly IConsistencyMessageStore _store;
private readonly ICapMessageStore _store;
private readonly ILogger _logger;
private readonly CancellationTokenSource _cts;
public DefaultProducerClient(
IConsistencyMessageStore store,
ILogger<DefaultProducerClient> logger) {
public DefaultProducerService(
ICapMessageStore store,
ILogger<DefaultProducerService> logger)
{
_store = store;
_logger = logger;
_cts = new CancellationTokenSource();
}
public Task SendAsync(string topic, string content) {
public Task SendAsync(string topic, string content)
{
if (topic == null) throw new ArgumentNullException(nameof(topic));
if (content == null) throw new ArgumentNullException(nameof(content));
return StoreMessage(topic, content);
}
public Task SendAsync<T>(string topic, T obj) {
public Task SendAsync<T>(string topic, T obj)
{
if (topic == null) throw new ArgumentNullException(nameof(topic));
var content = Helper.ToJson(obj);
......@@ -38,9 +40,10 @@ namespace Cap.Consistency
return StoreMessage(topic, content);
}
private async Task StoreMessage(string topic, string content) {
var message = new ConsistencyMessage {
private async Task StoreMessage(string topic, string content)
{
var message = new ConsistencyMessage
{
Topic = topic,
Payload = content
};
......@@ -49,7 +52,8 @@ namespace Cap.Consistency
WaitHandleEx.PulseEvent.Set();
if (_logger.IsEnabled(LogLevel.Debug)) {
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Enqueuing a topic to be store. topic:{topic}, content:{content}", topic, content);
}
}
......
using System.Threading.Tasks;
namespace DotNetCore.CAP
{
public interface ICapProducerService
{
Task SendAsync(string topic, string content);
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Infrastructure;
using DotNetCore.CAP.Infrastructure;
namespace Cap.Consistency.Consumer
namespace DotNetCore.CAP
{
public interface IConsumerClient : IDisposable
{
......
......@@ -3,7 +3,7 @@ using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Cap.Consistency.Consumer
namespace DotNetCore.CAP
{
public interface IConsumerClientFactory
{
......
......@@ -3,13 +3,13 @@ using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Cap.Consistency.Abstractions;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Internal;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Cap.Consistency.Consumer
namespace DotNetCore.CAP
{
public class ConsumerHandler : IConsumerHandler, IDisposable
{
......@@ -21,7 +21,7 @@ namespace Cap.Consistency.Consumer
private readonly MethodMatcherCache _selector;
private readonly ConsistencyOptions _options;
private readonly IConsistencyMessageStore _messageStore;
private readonly ICapMessageStore _messageStore;
private readonly CancellationTokenSource _cts;
public event EventHandler<ConsistencyMessage> MessageReceieved;
......@@ -35,7 +35,7 @@ namespace Cap.Consistency.Consumer
IConsumerInvokerFactory consumerInvokerFactory,
IConsumerClientFactory consumerClientFactory,
ILoggerFactory loggerFactory,
IConsistencyMessageStore messageStore,
ICapMessageStore messageStore,
MethodMatcherCache selector,
IOptions<ConsistencyOptions> options) {
_selector = selector;
......
namespace Cap.Consistency.Consumer
namespace DotNetCore.CAP
{
public interface IConsumerHandler : IProcessingServer
{
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Text;
namespace Cap.Consistency.Consumer
namespace DotNetCore.CAP
{
public interface IConsumerService
{
......
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Cap.Consistency
namespace DotNetCore.CAP
{
public interface IProcessingServer : IDisposable
{
......
using System;
namespace Cap.Consistency.Infrastructure
namespace DotNetCore.CAP.Infrastructure
{
/// <summary>
/// The default implementation of <see cref="ConsistencyMessage{TKey}"/> which uses a string as a primary key.
......@@ -13,7 +13,8 @@ namespace Cap.Consistency.Infrastructure
/// <remarks>
/// The Id property is initialized to from a new GUID string value.
/// </remarks>
public ConsistencyMessage() {
public ConsistencyMessage()
{
Id = Guid.NewGuid().ToString();
SendTime = DateTime.Now;
UpdateTime = SendTime;
......
using Cap.Consistency.Job;
using DotNetCore.CAP.Job;
namespace Cap.Consistency.Infrastructure
namespace DotNetCore.CAP.Infrastructure
{
/// <summary>
/// Represents all the options you can use to configure the system.
......
using System;
using System.Collections.Generic;
using System.Text;
namespace Cap.Consistency.Infrastructure
namespace DotNetCore.CAP.Infrastructure
{
public class DeliverMessage
{
......@@ -14,7 +10,6 @@ namespace Cap.Consistency.Infrastructure
/// </summary>
public string MessageKey { get; set; }
public byte[] Body { get; set; }
public string Value { get; set; }
......
using System;
using System.Collections.Generic;
using System.Text;
using Newtonsoft.Json;
namespace Cap.Consistency.Infrastructure
namespace DotNetCore.CAP.Infrastructure
{
internal static class Helper
{
private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
private static JsonSerializerSettings SerializerSettings;
public static void SetSerializerSettings(JsonSerializerSettings setting) {
public static void SetSerializerSettings(JsonSerializerSettings setting)
{
SerializerSettings = setting;
}
public static string ToJson(object value) {
public static string ToJson(object value)
{
return value != null
? JsonConvert.SerializeObject(value, SerializerSettings)
: null;
}
public static T FromJson<T>(string value) {
public static T FromJson<T>(string value)
{
return value != null
? JsonConvert.DeserializeObject<T>(value, SerializerSettings)
: default(T);
}
public static object FromJson(string value, Type type) {
public static object FromJson(string value, Type type)
{
if (type == null) throw new ArgumentNullException(nameof(type));
return value != null
......@@ -34,12 +36,14 @@ namespace Cap.Consistency.Infrastructure
: null;
}
public static long ToTimestamp(DateTime value) {
public static long ToTimestamp(DateTime value)
{
var elapsedTime = value - Epoch;
return (long)elapsedTime.TotalSeconds;
}
public static DateTime FromTimestamp(long value) {
public static DateTime FromTimestamp(long value)
{
return Epoch.AddSeconds(value);
}
}
......
using System;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Abstractions;
using System.Collections.Generic;
using DotNetCore.CAP.Abstractions;
namespace Cap.Consistency.Infrastructure
namespace DotNetCore.CAP.Infrastructure
{
public interface IConsumerExcutorSelector
{
......
using System;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Abstractions;
using DotNetCore.CAP.Abstractions;
namespace Cap.Consistency.Infrastructure
namespace DotNetCore.CAP.Infrastructure
{
public interface IConsumerInvokerFactory
{
......
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Cap.Consistency.Infrastructure
namespace DotNetCore.CAP.Infrastructure
{
public static class WaitHandleEx
{
public static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);
public static Task WaitAnyAsync(WaitHandle handle1, WaitHandle handle2, TimeSpan timeout) {
public static Task WaitAnyAsync(WaitHandle handle1, WaitHandle handle2, TimeSpan timeout)
{
var t1 = handle1.WaitOneAsync(timeout);
var t2 = handle2.WaitOneAsync(timeout);
return Task.WhenAny(t1, t2);
}
public static async Task<bool> WaitOneAsync(this WaitHandle handle, TimeSpan timeout) {
public static async Task<bool> WaitOneAsync(this WaitHandle handle, TimeSpan timeout)
{
RegisteredWaitHandle registeredHandle = null;
try {
try
{
var tcs = new TaskCompletionSource<bool>();
registeredHandle = ThreadPool.RegisterWaitForSingleObject(
handle,
......@@ -28,8 +29,10 @@ namespace Cap.Consistency.Infrastructure
true);
return await tcs.Task;
}
finally {
if (registeredHandle != null) {
finally
{
if (registeredHandle != null)
{
registeredHandle.Unregister(null);
}
}
......
......@@ -2,39 +2,41 @@
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using Cap.Consistency.Abstractions;
using Cap.Consistency.Consumer;
using Cap.Consistency.Infrastructure;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.DependencyInjection;
namespace Cap.Consistency.Internal
namespace DotNetCore.CAP.Internal
{
public class ConsumerExcutorSelector : IConsumerExcutorSelector
{
private readonly IServiceProvider _serviceProvider;
public ConsumerExcutorSelector(IServiceProvider serviceProvider) {
public ConsumerExcutorSelector(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor) {
public ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
{
return executeDescriptor.FirstOrDefault(x => x.Attribute.Name == key);
}
public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(TopicContext context) {
public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(TopicContext context)
{
var consumerServices = context.ServiceProvider.GetServices<IConsumerService>();
var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
foreach (var service in consumerServices) {
foreach (var service in consumerServices)
{
var typeInfo = service.GetType().GetTypeInfo();
if (!typeof(IConsumerService).GetTypeInfo().IsAssignableFrom(typeInfo)) {
if (!typeof(IConsumerService).GetTypeInfo().IsAssignableFrom(typeInfo))
{
continue;
}
foreach (var method in typeInfo.DeclaredMethods) {
foreach (var method in typeInfo.DeclaredMethods)
{
var topicAttr = method.GetCustomAttribute<TopicAttribute>(true);
if (topicAttr == null) continue;
......@@ -44,14 +46,18 @@ namespace Cap.Consistency.Internal
return executorDescriptorList;
}
private ConsumerExecutorDescriptor InitDescriptor(TopicAttribute attr,
MethodInfo methodInfo, TypeInfo implType
) {
var descriptor = new ConsumerExecutorDescriptor();
descriptor.Attribute = attr;
descriptor.MethodInfo = methodInfo;
descriptor.ImplTypeInfo = implType;
private ConsumerExecutorDescriptor InitDescriptor(
TopicAttribute attr,
MethodInfo methodInfo,
TypeInfo implType)
{
var descriptor = new ConsumerExecutorDescriptor()
{
Attribute = attr,
MethodInfo = methodInfo,
ImplTypeInfo = implType
};
return descriptor;
}
......
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using Cap.Consistency.Abstractions;
using Cap.Consistency.Abstractions.ModelBinding;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Abstractions.ModelBinding;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Cap.Consistency.Internal
namespace DotNetCore.CAP.Internal
{
public class ConsumerInvoker : IConsumerInvoker
{
......@@ -22,8 +19,8 @@ namespace Cap.Consistency.Internal
public ConsumerInvoker(ILogger logger,
IServiceProvider serviceProvider,
IModelBinder modelBinder,
ConsumerContext consumerContext) {
ConsumerContext consumerContext)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = serviceProvider;
_modelBinder = modelBinder;
......@@ -32,19 +29,22 @@ namespace Cap.Consistency.Internal
_consumerContext.ConsumerDescriptor.ImplTypeInfo);
}
public Task InvokeAsync() {
try {
using (_logger.BeginScope("consumer invoker begin")) {
public Task InvokeAsync()
{
try
{
using (_logger.BeginScope("consumer invoker begin"))
{
_logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.Attribute);
try {
try
{
var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType());
var bodyString = Encoding.UTF8.GetString(_consumerContext.DeliverMessage.Body);
if (_executor.MethodParameters.Length > 0) {
if (_executor.MethodParameters.Length > 0)
{
var firstParameter = _executor.MethodParameters[0];
var bindingContext = ModelBindingContext.CreateBindingContext(bodyString,
......@@ -52,23 +52,22 @@ namespace Cap.Consistency.Internal
_modelBinder.BindModelAsync(bindingContext);
_executor.Execute(obj, bindingContext.Result);
}
else {
else
{
_executor.Execute(obj);
}
return Task.CompletedTask;
}
finally {
finally
{
_logger.LogDebug("Executed consumer method .");
}
}
}
finally {
finally
{
}
}
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Abstractions;
using Cap.Consistency.Abstractions.ModelBinding;
using Cap.Consistency.Infrastructure;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Abstractions.ModelBinding;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.Logging;
namespace Cap.Consistency.Internal
namespace DotNetCore.CAP.Internal
{
public class ConsumerInvokerFactory : IConsumerInvokerFactory
{
......@@ -17,15 +15,15 @@ namespace Cap.Consistency.Internal
public ConsumerInvokerFactory(
ILoggerFactory loggerFactory,
IModelBinder modelBinder,
IServiceProvider serviceProvider) {
IServiceProvider serviceProvider)
{
_logger = loggerFactory.CreateLogger<ConsumerInvokerFactory>();
_modelBinder = modelBinder;
_serviceProvider = serviceProvider;
}
public IConsumerInvoker CreateInvoker(ConsumerContext consumerContext) {
public IConsumerInvoker CreateInvoker(ConsumerContext consumerContext)
{
var context = new ConsumerInvokerContext(consumerContext);
context.Result = new ConsumerInvoker(_logger, _serviceProvider, _modelBinder, consumerContext);
......
using System;
using System.Collections.Generic;
using System.Text;
using System.Collections.Generic;
namespace Cap.Consistency.Internal
namespace DotNetCore.CAP.Internal
{
public class ConsumerMethodExecutor
{
public static object[] PrepareArguments(
IDictionary<string, object> actionParameters,
ObjectMethodExecutor actionMethodExecutor) {
ObjectMethodExecutor actionMethodExecutor)
{
var declaredParameterInfos = actionMethodExecutor.MethodParameters;
var count = declaredParameterInfos.Length;
if (count == 0) {
if (count == 0)
{
return null;
}
var arguments = new object[count];
for (var index = 0; index < count; index++) {
for (var index = 0; index < count; index++)
{
var parameterInfo = declaredParameterInfos[index];
object value;
if (!actionParameters.TryGetValue(parameterInfo.Name, out value)) {
if (!actionParameters.TryGetValue(parameterInfo.Name, out value))
{
value = actionMethodExecutor.GetDefaultValueForParameter(index);
}
......
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using Cap.Consistency.Abstractions.ModelBinding;
using DotNetCore.CAP.Abstractions.ModelBinding;
using Newtonsoft.Json;
namespace Cap.Consistency.Internal
namespace DotNetCore.CAP.Internal
{
public class DefaultModelBinder : IModelBinder
{
private Func<object> _modelCreator;
public Task BindModelAsync(ModelBindingContext bindingContext) {
if (bindingContext.Model == null) {
public Task BindModelAsync(ModelBindingContext bindingContext)
{
if (bindingContext.Model == null)
{
bindingContext.Model = CreateModel(bindingContext);
}
bindingContext.Result = JsonConvert.DeserializeObject(bindingContext.Values, bindingContext.ModelType);
return Task.CompletedTask;
}
protected virtual object CreateModel(ModelBindingContext bindingContext) {
if (bindingContext == null) {
protected virtual object CreateModel(ModelBindingContext bindingContext)
{
if (bindingContext == null)
{
throw new ArgumentNullException(nameof(bindingContext));
}
if (_modelCreator == null) {
if (_modelCreator == null)
{
var modelTypeInfo = bindingContext.ModelType.GetTypeInfo();
if (modelTypeInfo.IsAbstract || modelTypeInfo.GetConstructor(Type.EmptyTypes) == null) {
if (modelTypeInfo.IsAbstract || modelTypeInfo.GetConstructor(Type.EmptyTypes) == null)
{
throw new InvalidOperationException();
}
......
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Abstractions;
using Cap.Consistency.Infrastructure;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
namespace Cap.Consistency.Internal
namespace DotNetCore.CAP.Internal
{
public class MethodMatcherCache
{
private readonly IConsumerExcutorSelector _selector;
public MethodMatcherCache(IConsumerExcutorSelector selector) {
public MethodMatcherCache(IConsumerExcutorSelector selector)
{
_selector = selector;
}
public ConcurrentDictionary<string, ConsumerExecutorDescriptor> GetCandidatesMethods(TopicContext routeContext) {
if (Entries.Count == 0) {
public ConcurrentDictionary<string, ConsumerExecutorDescriptor> GetCandidatesMethods(TopicContext routeContext)
{
if (Entries.Count == 0)
{
var executorCollection = _selector.SelectCandidates(routeContext);
foreach (var item in executorCollection) {
foreach (var item in executorCollection)
{
Entries.GetOrAdd(item.Attribute.Name, item);
}
}
return Entries;
}
public ConsumerExecutorDescriptor GetTopicExector(string topicName) {
if (Entries == null) {
public ConsumerExecutorDescriptor GetTopicExector(string topicName)
{
if (Entries == null)
{
throw new ArgumentNullException(nameof(Entries));
}
......
......@@ -2,9 +2,8 @@
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
namespace Cap.Consistency.Internal
namespace DotNetCore.CAP.Internal
{
/// <summary>
/// Provides access to the combined list of attributes associated a <see cref="Type"/> or property.
......@@ -15,8 +14,10 @@ namespace Cap.Consistency.Internal
/// Creates a new <see cref="ModelAttributes"/> for a <see cref="Type"/>.
/// </summary>
/// <param name="typeAttributes">The set of attributes for the <see cref="Type"/>.</param>
public ModelAttributes(IEnumerable<object> typeAttributes) {
if (typeAttributes == null) {
public ModelAttributes(IEnumerable<object> typeAttributes)
{
if (typeAttributes == null)
{
throw new ArgumentNullException(nameof(typeAttributes));
}
......@@ -31,12 +32,15 @@ namespace Cap.Consistency.Internal
/// <param name="typeAttributes">
/// The set of attributes for the property's <see cref="Type"/>. See <see cref="PropertyInfo.PropertyType"/>.
/// </param>
public ModelAttributes(IEnumerable<object> propertyAttributes, IEnumerable<object> typeAttributes) {
if (propertyAttributes == null) {
public ModelAttributes(IEnumerable<object> propertyAttributes, IEnumerable<object> typeAttributes)
{
if (propertyAttributes == null)
{
throw new ArgumentNullException(nameof(propertyAttributes));
}
if (typeAttributes == null) {
if (typeAttributes == null)
{
throw new ArgumentNullException(nameof(typeAttributes));
}
......@@ -72,12 +76,15 @@ namespace Cap.Consistency.Internal
/// <param name="property">A <see cref="PropertyInfo"/> for which attributes need to be resolved.
/// </param>
/// <returns>A <see cref="ModelAttributes"/> instance with the attributes of the property.</returns>
public static ModelAttributes GetAttributesForProperty(Type type, PropertyInfo property) {
if (type == null) {
public static ModelAttributes GetAttributesForProperty(Type type, PropertyInfo property)
{
if (type == null)
{
throw new ArgumentNullException(nameof(type));
}
if (property == null) {
if (property == null)
{
throw new ArgumentNullException(nameof(property));
}
......@@ -93,8 +100,10 @@ namespace Cap.Consistency.Internal
/// <param name="type">The <see cref="Type"/> for which attributes need to be resolved.
/// </param>
/// <returns>A <see cref="ModelAttributes"/> instance with the attributes of the <see cref="Type"/>.</returns>
public static ModelAttributes GetAttributesForType(Type type) {
if (type == null) {
public static ModelAttributes GetAttributesForType(Type type)
{
if (type == null)
{
throw new ArgumentNullException(nameof(type));
}
......
......@@ -4,10 +4,9 @@ using System.ComponentModel;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace Cap.Consistency.Internal
namespace DotNetCore.CAP.Internal
{
public class ObjectMethodExecutor
{
......@@ -18,8 +17,10 @@ namespace Cap.Consistency.Internal
private static readonly MethodInfo _convertOfTMethod =
typeof(ObjectMethodExecutor).GetRuntimeMethods().Single(methodInfo => methodInfo.Name == nameof(ObjectMethodExecutor.Convert));
private ObjectMethodExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo) {
if (methodInfo == null) {
private ObjectMethodExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo)
{
if (methodInfo == null)
{
throw new ArgumentNullException(nameof(methodInfo));
}
......@@ -31,13 +32,15 @@ namespace Cap.Consistency.Internal
TaskGenericType = IsMethodAsync ? GetTaskInnerTypeOrNull(MethodReturnType) : null;
//IsTypeAssignableFromIActionResult = typeof(IActionResult).IsAssignableFrom(TaskGenericType ?? MethodReturnType);
if (IsMethodAsync && TaskGenericType != null) {
if (IsMethodAsync && TaskGenericType != null)
{
// For backwards compatibility we're creating a sync-executor for an async method. This was
// supported in the past even though MVC wouldn't have called it.
_executor = GetExecutor(methodInfo, targetTypeInfo);
_executorAsync = GetExecutorAsync(TaskGenericType, methodInfo, targetTypeInfo);
}
else {
else
{
_executor = GetExecutor(methodInfo, targetTypeInfo);
}
......@@ -65,28 +68,34 @@ namespace Cap.Consistency.Internal
//public bool IsTypeAssignableFromIActionResult { get; }
public static ObjectMethodExecutor Create(MethodInfo methodInfo, TypeInfo targetTypeInfo) {
public static ObjectMethodExecutor Create(MethodInfo methodInfo, TypeInfo targetTypeInfo)
{
var executor = new ObjectMethodExecutor(methodInfo, targetTypeInfo);
return executor;
}
public Task<object> ExecuteAsync(object target, params object[] parameters) {
public Task<object> ExecuteAsync(object target, params object[] parameters)
{
return _executorAsync(target, parameters);
}
public object Execute(object target, params object[] parameters) {
public object Execute(object target, params object[] parameters)
{
return _executor(target, parameters);
}
public object GetDefaultValueForParameter(int index) {
if (index < 0 || index > MethodParameters.Length - 1) {
public object GetDefaultValueForParameter(int index)
{
if (index < 0 || index > MethodParameters.Length - 1)
{
throw new ArgumentOutOfRangeException(nameof(index));
}
return _parameterDefaultValues[index];
}
private static ConsumerMethodExecutor GetExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo) {
private static ConsumerMethodExecutor GetExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo)
{
// Parameters to executor
var targetParameter = Expression.Parameter(typeof(object), "target");
var parametersParameter = Expression.Parameter(typeof(object[]), "parameters");
......@@ -94,7 +103,8 @@ namespace Cap.Consistency.Internal
// Build parameter list
var parameters = new List<Expression>();
var paramInfos = methodInfo.GetParameters();
for (int i = 0; i < paramInfos.Length; i++) {
for (int i = 0; i < paramInfos.Length; i++)
{
var paramInfo = paramInfos[i];
var valueObj = Expression.ArrayIndex(parametersParameter, Expression.Constant(i));
var valueCast = Expression.Convert(valueObj, paramInfo.ParameterType);
......@@ -109,12 +119,14 @@ namespace Cap.Consistency.Internal
// methodCall is "((Ttarget) target) method((T0) parameters[0], (T1) parameters[1], ...)"
// Create function
if (methodCall.Type == typeof(void)) {
if (methodCall.Type == typeof(void))
{
var lambda = Expression.Lambda<VoidActionExecutor>(methodCall, targetParameter, parametersParameter);
var voidExecutor = lambda.Compile();
return WrapVoidAction(voidExecutor);
}
else {
else
{
// must coerce methodCall to match ActionExecutor signature
var castMethodCall = Expression.Convert(methodCall, typeof(object));
var lambda = Expression.Lambda<ConsumerMethodExecutor>(castMethodCall, targetParameter, parametersParameter);
......@@ -122,14 +134,17 @@ namespace Cap.Consistency.Internal
}
}
private static ConsumerMethodExecutor WrapVoidAction(VoidActionExecutor executor) {
return delegate (object target, object[] parameters) {
private static ConsumerMethodExecutor WrapVoidAction(VoidActionExecutor executor)
{
return delegate (object target, object[] parameters)
{
executor(target, parameters);
return null;
};
}
private static ConsumerMethodExecutorAsync GetExecutorAsync(Type taskInnerType, MethodInfo methodInfo, TypeInfo targetTypeInfo) {
private static ConsumerMethodExecutorAsync GetExecutorAsync(Type taskInnerType, MethodInfo methodInfo, TypeInfo targetTypeInfo)
{
// Parameters to executor
var targetParameter = Expression.Parameter(typeof(object), "target");
var parametersParameter = Expression.Parameter(typeof(object[]), "parameters");
......@@ -137,7 +152,8 @@ namespace Cap.Consistency.Internal
// Build parameter list
var parameters = new List<Expression>();
var paramInfos = methodInfo.GetParameters();
for (int i = 0; i < paramInfos.Length; i++) {
for (int i = 0; i < paramInfos.Length; i++)
{
var paramInfo = paramInfos[i];
var valueObj = Expression.ArrayIndex(parametersParameter, Expression.Constant(i));
var valueCast = Expression.Convert(valueObj, paramInfo.ParameterType);
......@@ -162,7 +178,8 @@ namespace Cap.Consistency.Internal
private static Expression GetCoerceMethodCallExpression(
Type taskValueType,
MethodCallExpression methodCall,
MethodInfo methodInfo) {
MethodInfo methodInfo)
{
var castMethodCall = Expression.Convert(methodCall, typeof(object));
// for: public Task<T> Action()
// constructs: return (Task<object>)Convert<T>((Task<T>)result)
......@@ -175,26 +192,32 @@ namespace Cap.Consistency.Internal
/// <summary>
/// Cast Task of T to Task of object
/// </summary>
private static async Task<object> CastToObject<T>(Task<T> task) {
private static async Task<object> CastToObject<T>(Task<T> task)
{
return (object)await task;
}
private static Type GetTaskInnerTypeOrNull(Type type) {
private static Type GetTaskInnerTypeOrNull(Type type)
{
var genericType = ExtractGenericInterface(type, typeof(Task<>));
return genericType?.GenericTypeArguments[0];
}
public static Type ExtractGenericInterface(Type queryType, Type interfaceType) {
if (queryType == null) {
public static Type ExtractGenericInterface(Type queryType, Type interfaceType)
{
if (queryType == null)
{
throw new ArgumentNullException(nameof(queryType));
}
if (interfaceType == null) {
if (interfaceType == null)
{
throw new ArgumentNullException(nameof(interfaceType));
}
if (IsGenericInstantiation(queryType, interfaceType)) {
if (IsGenericInstantiation(queryType, interfaceType))
{
// queryType matches (i.e. is a closed generic type created from) the open generic type.
return queryType;
}
......@@ -208,70 +231,86 @@ namespace Cap.Consistency.Internal
return GetGenericInstantiation(queryType, interfaceType);
}
private static bool IsGenericInstantiation(Type candidate, Type interfaceType) {
private static bool IsGenericInstantiation(Type candidate, Type interfaceType)
{
return
candidate.GetTypeInfo().IsGenericType &&
candidate.GetGenericTypeDefinition() == interfaceType;
}
private static Type GetGenericInstantiation(Type queryType, Type interfaceType) {
private static Type GetGenericInstantiation(Type queryType, Type interfaceType)
{
Type bestMatch = null;
var interfaces = queryType.GetInterfaces();
foreach (var @interface in interfaces) {
if (IsGenericInstantiation(@interface, interfaceType)) {
if (bestMatch == null) {
foreach (var @interface in interfaces)
{
if (IsGenericInstantiation(@interface, interfaceType))
{
if (bestMatch == null)
{
bestMatch = @interface;
}
else if (StringComparer.Ordinal.Compare(@interface.FullName, bestMatch.FullName) < 0) {
else if (StringComparer.Ordinal.Compare(@interface.FullName, bestMatch.FullName) < 0)
{
bestMatch = @interface;
}
else {
else
{
// There are two matches at this level of the class hierarchy, but @interface is after
// bestMatch in the sort order.
}
}
}
if (bestMatch != null) {
if (bestMatch != null)
{
return bestMatch;
}
// BaseType will be null for object and interfaces, which means we've reached 'bottom'.
var baseType = queryType?.GetTypeInfo().BaseType;
if (baseType == null) {
if (baseType == null)
{
return null;
}
else {
else
{
return GetGenericInstantiation(baseType, interfaceType);
}
}
private static Task<object> Convert<T>(object taskAsObject) {
private static Task<object> Convert<T>(object taskAsObject)
{
var task = (Task<T>)taskAsObject;
return CastToObject<T>(task);
}
private static object[] GetParameterDefaultValues(ParameterInfo[] parameters) {
private static object[] GetParameterDefaultValues(ParameterInfo[] parameters)
{
var values = new object[parameters.Length];
for (var i = 0; i < parameters.Length; i++) {
for (var i = 0; i < parameters.Length; i++)
{
var parameterInfo = parameters[i];
object defaultValue;
if (parameterInfo.HasDefaultValue) {
if (parameterInfo.HasDefaultValue)
{
defaultValue = parameterInfo.DefaultValue;
}
else {
else
{
var defaultValueAttribute = parameterInfo
.GetCustomAttribute<DefaultValueAttribute>(inherit: false);
if (defaultValueAttribute?.Value == null) {
if (defaultValueAttribute?.Value == null)
{
defaultValue = parameterInfo.ParameterType.GetTypeInfo().IsValueType
? Activator.CreateInstance(parameterInfo.ParameterType)
: null;
}
else {
else
{
defaultValue = defaultValueAttribute.Value;
}
}
......
using System;
using System.Collections.Generic;
using System.Text;
using NCrontab;
namespace Cap.Consistency.Job
namespace DotNetCore.CAP.Job
{
public class ComputedCronJob
{
private CronJobRegistry.Entry _entry;
public ComputedCronJob() {
public ComputedCronJob()
{
}
public ComputedCronJob(CronJob job) {
public ComputedCronJob(CronJob job)
{
Job = job;
Schedule = CrontabSchedule.Parse(job.Cron);
if (job.TypeName != null) {
if (job.TypeName != null)
{
JobType = Type.GetType(job.TypeName);
}
}
public ComputedCronJob(CronJob job, CronJobRegistry.Entry entry)
: this(job) {
: this(job)
{
_entry = entry;
}
......@@ -40,11 +42,13 @@ namespace Cap.Consistency.Job
public RetryBehavior RetryBehavior => _entry.RetryBehavior;
public void Update(DateTime baseTime) {
public void Update(DateTime baseTime)
{
Job.LastRun = baseTime;
}
public void UpdateNext(DateTime now) {
public void UpdateNext(DateTime now)
{
var next = Schedule.GetNextOccurrence(now);
var previousNext = Schedule.GetNextOccurrence(Job.LastRun);
Next = next > previousNext ? now : next;
......
using System;
using System.Collections.Generic;
using System.Text;
namespace Cap.Consistency.Job
namespace DotNetCore.CAP.Job
{
public class Cron
{
/// <summary>
/// Returns cron expression that fires every minute.
/// </summary>
public static string Minutely() {
public static string Minutely()
{
return "* * * * *";
}
/// <summary>
/// Returns cron expression that fires every hour at the first minute.
/// </summary>
public static string Hourly() {
public static string Hourly()
{
return Hourly(minute: 0);
}
......@@ -24,14 +24,16 @@ namespace Cap.Consistency.Job
/// Returns cron expression that fires every hour at the specified minute.
/// </summary>
/// <param name="minute">The minute in which the schedule will be activated (0-59).</param>
public static string Hourly(int minute) {
public static string Hourly(int minute)
{
return string.Format("{0} * * * *", minute);
}
/// <summary>
/// Returns cron expression that fires every day at 00:00 UTC.
/// </summary>
public static string Daily() {
public static string Daily()
{
return Daily(hour: 0);
}
......@@ -40,7 +42,8 @@ namespace Cap.Consistency.Job
/// the specified hour in UTC.
/// </summary>
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param>
public static string Daily(int hour) {
public static string Daily(int hour)
{
return Daily(hour, minute: 0);
}
......@@ -50,14 +53,16 @@ namespace Cap.Consistency.Job
/// </summary>
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param>
/// <param name="minute">The minute in which the schedule will be activated (0-59).</param>
public static string Daily(int hour, int minute) {
public static string Daily(int hour, int minute)
{
return string.Format("{0} {1} * * *", minute, hour);
}
/// <summary>
/// Returns cron expression that fires every week at Monday, 00:00 UTC.
/// </summary>
public static string Weekly() {
public static string Weekly()
{
return Weekly(DayOfWeek.Monday);
}
......@@ -66,7 +71,8 @@ namespace Cap.Consistency.Job
/// day of the week.
/// </summary>
/// <param name="dayOfWeek">The day of week in which the schedule will be activated.</param>
public static string Weekly(DayOfWeek dayOfWeek) {
public static string Weekly(DayOfWeek dayOfWeek)
{
return Weekly(dayOfWeek, hour: 0);
}
......@@ -76,7 +82,8 @@ namespace Cap.Consistency.Job
/// </summary>
/// <param name="dayOfWeek">The day of week in which the schedule will be activated.</param>
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param>
public static string Weekly(DayOfWeek dayOfWeek, int hour) {
public static string Weekly(DayOfWeek dayOfWeek, int hour)
{
return Weekly(dayOfWeek, hour, minute: 0);
}
......@@ -87,7 +94,8 @@ namespace Cap.Consistency.Job
/// <param name="dayOfWeek">The day of week in which the schedule will be activated.</param>
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param>
/// <param name="minute">The minute in which the schedule will be activated (0-59).</param>
public static string Weekly(DayOfWeek dayOfWeek, int hour, int minute) {
public static string Weekly(DayOfWeek dayOfWeek, int hour, int minute)
{
return string.Format("{0} {1} * * {2}", minute, hour, (int)dayOfWeek);
}
......@@ -95,7 +103,8 @@ namespace Cap.Consistency.Job
/// Returns cron expression that fires every month at 00:00 UTC of the first
/// day of month.
/// </summary>
public static string Monthly() {
public static string Monthly()
{
return Monthly(day: 1);
}
......@@ -104,7 +113,8 @@ namespace Cap.Consistency.Job
/// day of month.
/// </summary>
/// <param name="day">The day of month in which the schedule will be activated (1-31).</param>
public static string Monthly(int day) {
public static string Monthly(int day)
{
return Monthly(day, hour: 0);
}
......@@ -114,7 +124,8 @@ namespace Cap.Consistency.Job
/// </summary>
/// <param name="day">The day of month in which the schedule will be activated (1-31).</param>
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param>
public static string Monthly(int day, int hour) {
public static string Monthly(int day, int hour)
{
return Monthly(day, hour, minute: 0);
}
......@@ -125,14 +136,16 @@ namespace Cap.Consistency.Job
/// <param name="day">The day of month in which the schedule will be activated (1-31).</param>
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param>
/// <param name="minute">The minute in which the schedule will be activated (0-59).</param>
public static string Monthly(int day, int hour, int minute) {
public static string Monthly(int day, int hour, int minute)
{
return string.Format("{0} {1} {2} * *", minute, hour, day);
}
/// <summary>
/// Returns cron expression that fires every year on Jan, 1st at 00:00 UTC.
/// </summary>
public static string Yearly() {
public static string Yearly()
{
return Yearly(month: 1);
}
......@@ -141,7 +154,8 @@ namespace Cap.Consistency.Job
/// of the specified month.
/// </summary>
/// <param name="month">The month in which the schedule will be activated (1-12).</param>
public static string Yearly(int month) {
public static string Yearly(int month)
{
return Yearly(month, day: 1);
}
......@@ -151,7 +165,8 @@ namespace Cap.Consistency.Job
/// </summary>
/// <param name="month">The month in which the schedule will be activated (1-12).</param>
/// <param name="day">The day of month in which the schedule will be activated (1-31).</param>
public static string Yearly(int month, int day) {
public static string Yearly(int month, int day)
{
return Yearly(month, day, hour: 0);
}
......@@ -162,7 +177,8 @@ namespace Cap.Consistency.Job
/// <param name="month">The month in which the schedule will be activated (1-12).</param>
/// <param name="day">The day of month in which the schedule will be activated (1-31).</param>
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param>
public static string Yearly(int month, int day, int hour) {
public static string Yearly(int month, int day, int hour)
{
return Yearly(month, day, hour, minute: 0);
}
......@@ -174,7 +190,8 @@ namespace Cap.Consistency.Job
/// <param name="day">The day of month in which the schedule will be activated (1-31).</param>
/// <param name="hour">The hour in which the schedule will be activated (0-23).</param>
/// <param name="minute">The minute in which the schedule will be activated (0-59).</param>
public static string Yearly(int month, int day, int hour, int minute) {
public static string Yearly(int month, int day, int hour, int minute)
{
return string.Format("{0} {1} {2} {3} *", minute, hour, day, month);
}
}
......
using System;
using System.Collections.Generic;
using System.Text;
namespace Cap.Consistency.Job
namespace DotNetCore.CAP.Job
{
/// <summary>
/// Represents a cron job to be executed at specified intervals of time.
/// </summary>
public class CronJob
{
public CronJob() {
public CronJob()
{
Id = Guid.NewGuid().ToString();
}
public CronJob(string cron)
: this() {
: this()
{
Cron = cron;
}
public CronJob(string cron, DateTime lastRun)
: this(cron) {
: this(cron)
{
LastRun = lastRun;
}
......
using System;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Infrastructure;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.Options;
namespace Cap.Consistency.Job
namespace DotNetCore.CAP.Job
{
public class DefaultCronJobRegistry : CronJobRegistry
{
private readonly ConsistencyOptions _options;
public DefaultCronJobRegistry(IOptions<ConsistencyOptions> options) {
public DefaultCronJobRegistry(IOptions<ConsistencyOptions> options)
{
_options = options.Value;
RegisterJob<CapJob>(nameof(DefaultCronJobRegistry), _options.CronExp, RetryBehavior.DefaultRetry);
......
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
using NCrontab;
namespace Cap.Consistency.Job
namespace DotNetCore.CAP.Job
{
public abstract class CronJobRegistry
{
private List<Entry> _entries;
public CronJobRegistry() {
public CronJobRegistry()
{
_entries = new List<Entry>();
}
protected void RegisterJob<T>(string name, string cron, RetryBehavior retryBehavior = null)
where T : IJob {
where T : IJob
{
RegisterJob(name, typeof(T), cron, retryBehavior);
}
......@@ -26,7 +27,8 @@ namespace Cap.Consistency.Job
/// <param name="jobType">The job's type.</param>
/// <param name="cron">The cron expression to use.</param>
/// <param name="retryBehavior">The <see cref="RetryBehavior"/> to use.</param>
protected void RegisterJob(string name, Type jobType, string cron, RetryBehavior retryBehavior = null) {
protected void RegisterJob(string name, Type jobType, string cron, RetryBehavior retryBehavior = null)
{
if (string.IsNullOrWhiteSpace(name)) throw new ArgumentException(nameof(cron));
if (jobType == null) throw new ArgumentNullException(nameof(jobType));
if (cron == null) throw new ArgumentNullException(nameof(cron));
......@@ -34,7 +36,8 @@ namespace Cap.Consistency.Job
CrontabSchedule.TryParse(cron);
if (!typeof(IJob).GetTypeInfo().IsAssignableFrom(jobType)) {
if (!typeof(IJob).GetTypeInfo().IsAssignableFrom(jobType))
{
throw new ArgumentException(
"Cron jobs should extend IJob.", nameof(jobType));
}
......@@ -46,7 +49,8 @@ namespace Cap.Consistency.Job
public class Entry
{
public Entry(string name, Type jobType, string cron) {
public Entry(string name, Type jobType, string cron)
{
Name = name;
JobType = jobType;
Cron = cron;
......
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Cap.Consistency.Job
namespace DotNetCore.CAP.Job
{
public class CapJob : IJob
{
public Task ExecuteAsync() {
public Task ExecuteAsync()
{
Console.WriteLine("当前时间:" + DateTime.Now.ToString());
return Task.CompletedTask;
......
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks;
namespace Cap.Consistency.Job
namespace DotNetCore.CAP.Job
{
public interface IJob
{
......
......@@ -3,13 +3,13 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Job;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Job;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Cap.Consistency
namespace DotNetCore.CAP
{
public class JobProcessingServer : IProcessingServer, IDisposable
{
......@@ -29,8 +29,8 @@ namespace Cap.Consistency
ILoggerFactory loggerFactory,
IServiceProvider provider,
DefaultCronJobRegistry defaultJobRegistry,
IOptions<ConsistencyOptions> options) {
IOptions<ConsistencyOptions> options)
{
_logger = logger;
_loggerFactory = loggerFactory;
_provider = provider;
......@@ -39,8 +39,8 @@ namespace Cap.Consistency
_cts = new CancellationTokenSource();
}
public void Start() {
public void Start()
{
var processorCount = Environment.ProcessorCount;
processorCount = 1;
_processors = GetProcessors(processorCount);
......@@ -57,40 +57,50 @@ namespace Cap.Consistency
_compositeTask = Task.WhenAll(processorTasks);
}
public void Dispose() {
if (_disposed) {
public void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
_logger.ServerShuttingDown();
_cts.Cancel();
try {
try
{
_compositeTask.Wait((int)TimeSpan.FromSeconds(60).TotalMilliseconds);
}
catch (AggregateException ex) {
catch (AggregateException ex)
{
var innerEx = ex.InnerExceptions[0];
if (!(innerEx is OperationCanceledException)) {
if (!(innerEx is OperationCanceledException))
{
_logger.ExpectedOperationCanceledException(innerEx);
}
}
}
private IJobProcessor InfiniteRetry(IJobProcessor inner) {
private IJobProcessor InfiniteRetry(IJobProcessor inner)
{
return new InfiniteRetryProcessor(inner, _loggerFactory);
}
private IJobProcessor[] GetProcessors(int processorCount) {
private IJobProcessor[] GetProcessors(int processorCount)
{
var returnedProcessors = new List<IJobProcessor>();
for (int i = 0; i < processorCount; i++) {
for (int i = 0; i < processorCount; i++)
{
var processors = _provider.GetServices<IJobProcessor>();
foreach (var processor in processors) {
if (processor is CronJobProcessor) {
foreach (var processor in processors)
{
if (processor is CronJobProcessor)
{
if (i == 0) // only add first cronJob
returnedProcessors.Add(processor);
}
else {
else
{
returnedProcessors.Add(processor);
}
}
......
......@@ -2,12 +2,11 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace Cap.Consistency.Job
namespace DotNetCore.CAP.Job
{
public class CronJobProcessor : IJobProcessor
{
......@@ -18,8 +17,8 @@ namespace Cap.Consistency.Job
public CronJobProcessor(
DefaultCronJobRegistry jobRegistry,
ILogger<CronJobProcessor> logger,
IServiceProvider provider) {
IServiceProvider provider)
{
_jobRegistry = jobRegistry;
_logger = logger;
_provider = provider;
......@@ -27,17 +26,20 @@ namespace Cap.Consistency.Job
public override string ToString() => nameof(CronJobProcessor);
public Task ProcessAsync(ProcessingContext context) {
public Task ProcessAsync(ProcessingContext context)
{
if (context == null) throw new ArgumentNullException(nameof(context));
return ProcessCoreAsync(context);
}
private async Task ProcessCoreAsync(ProcessingContext context) {
private async Task ProcessCoreAsync(ProcessingContext context)
{
//var storage = context.Storage;
//var jobs = await GetJobsAsync(storage);
var jobs = GetJobs();
if (!jobs.Any()) {
if (!jobs.Any())
{
_logger.CronJobsNotFound();
// This will cancel this processor.
......@@ -48,52 +50,61 @@ namespace Cap.Consistency.Job
context.ThrowIfStopping();
var computedJobs = Compute(jobs, context.CronJobRegistry.Build());
if (context.IsStopping) {
if (context.IsStopping)
{
return;
}
await Task.WhenAll(computedJobs.Select(j => RunAsync(j, context)));
}
private async Task RunAsync(ComputedCronJob computedJob, ProcessingContext context) {
private async Task RunAsync(ComputedCronJob computedJob, ProcessingContext context)
{
//var storage = context.Storage;
var retryBehavior = computedJob.RetryBehavior;
while (!context.IsStopping) {
while (!context.IsStopping)
{
var now = DateTime.UtcNow;
var due = ComputeDue(computedJob, now);
var timeSpan = due - now;
if (timeSpan.TotalSeconds > 0) {
if (timeSpan.TotalSeconds > 0)
{
await context.WaitAsync(timeSpan);
}
context.ThrowIfStopping();
using (var scopedContext = context.CreateScope()) {
using (var scopedContext = context.CreateScope())
{
var provider = scopedContext.Provider;
var job = provider.GetService<IJob>();
var success = true;
try {
try
{
var sw = Stopwatch.StartNew();
await job.ExecuteAsync();
sw.Stop();
computedJob.Retries = 0;
_logger.CronJobExecuted(computedJob.Job.Name, sw.Elapsed.TotalSeconds);
}
catch (Exception ex) {
catch (Exception ex)
{
success = false;
if (computedJob.Retries == 0) {
if (computedJob.Retries == 0)
{
computedJob.FirstTry = DateTime.UtcNow;
}
computedJob.Retries++;
_logger.CronJobFailed(computedJob.Job.Name, ex);
}
if (success) {
if (success)
{
//var connection = provider.GetRequiredService<IStorageConnection>();
//await connection.AttachCronJobAsync(computedJob.Job);
......@@ -105,24 +116,28 @@ namespace Cap.Consistency.Job
}
}
private DateTime ComputeDue(ComputedCronJob computedJob, DateTime now) {
private DateTime ComputeDue(ComputedCronJob computedJob, DateTime now)
{
computedJob.UpdateNext(now);
var retryBehavior = computedJob.RetryBehavior ?? RetryBehavior.DefaultRetry;
var retries = computedJob.Retries;
if (retries == 0) {
if (retries == 0)
{
return computedJob.Next;
}
var realNext = computedJob.Schedule.GetNextOccurrence(now);
if (!retryBehavior.Retry) {
if (!retryBehavior.Retry)
{
// No retry. If job failed before, we don't care, just schedule it next as usual.
return realNext;
}
if (retries >= retryBehavior.RetryCount) {
if (retries >= retryBehavior.RetryCount)
{
// Max retries. Just schedule it for the next occurance.
return realNext;
}
......@@ -131,11 +146,14 @@ namespace Cap.Consistency.Job
return computedJob.FirstTry.AddSeconds(retryBehavior.RetryIn(retries));
}
private CronJob[] GetJobs() {
private CronJob[] GetJobs()
{
var cronJobs = new List<CronJob>();
var entries = _jobRegistry.Build() ?? new CronJobRegistry.Entry[0];
foreach (var entry in entries) {
cronJobs.Add(new CronJob {
foreach (var entry in entries)
{
cronJobs.Add(new CronJob
{
Name = entry.Name,
TypeName = entry.JobType.AssemblyQualifiedName,
Cron = entry.Cron,
......@@ -148,7 +166,8 @@ namespace Cap.Consistency.Job
private ComputedCronJob[] Compute(IEnumerable<CronJob> jobs, CronJobRegistry.Entry[] entries)
=> jobs.Select(j => CreateComputedCronJob(j, entries)).ToArray();
private ComputedCronJob CreateComputedCronJob(CronJob job, CronJobRegistry.Entry[] entries) {
private ComputedCronJob CreateComputedCronJob(CronJob job, CronJobRegistry.Entry[] entries)
{
var entry = entries.First(e => e.Name == job.Name);
return new ComputedCronJob(job, entry);
}
......
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
namespace Cap.Consistency.Job
namespace DotNetCore.CAP.Job
{
public class InfiniteRetryProcessor : IJobProcessor
{
......@@ -13,22 +11,28 @@ namespace Cap.Consistency.Job
public InfiniteRetryProcessor(
IJobProcessor inner,
ILoggerFactory loggerFactory) {
ILoggerFactory loggerFactory)
{
_inner = inner;
_logger = loggerFactory.CreateLogger<InfiniteRetryProcessor>();
}
public override string ToString() => _inner.ToString();
public async Task ProcessAsync(ProcessingContext context) {
while (!context.IsStopping) {
try {
public async Task ProcessAsync(ProcessingContext context)
{
while (!context.IsStopping)
{
try
{
await _inner.ProcessAsync(context);
}
catch (OperationCanceledException) {
catch (OperationCanceledException)
{
return;
}
catch (Exception ex) {
catch (Exception ex)
{
_logger.LogWarning(
1,
ex,
......
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks;
namespace Cap.Consistency.Job
namespace DotNetCore.CAP.Job
{
public interface IJobProcessor
{
......
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
namespace Cap.Consistency.Job
namespace DotNetCore.CAP.Job
{
public class ProcessingContext : IDisposable
{
private IServiceScope _scope;
private ProcessingContext(ProcessingContext other) {
private ProcessingContext(ProcessingContext other)
{
Provider = other.Provider;
//Storage = other.Storage;
CronJobRegistry = other.CronJobRegistry;
CancellationToken = other.CancellationToken;
}
public ProcessingContext() {
public ProcessingContext()
{
}
public ProcessingContext(
IServiceProvider provider,
//IStorage storage,
CronJobRegistry cronJobRegistry,
CancellationToken cancellationToken) {
CancellationToken cancellationToken)
{
Provider = provider;
//Storage = storage;
CronJobRegistry = cronJobRegistry;
......@@ -44,21 +45,26 @@ namespace Cap.Consistency.Job
public void ThrowIfStopping() => CancellationToken.ThrowIfCancellationRequested();
public ProcessingContext CreateScope() {
public ProcessingContext CreateScope()
{
var serviceScope = Provider.CreateScope();
return new ProcessingContext(this) {
return new ProcessingContext(this)
{
_scope = serviceScope,
Provider = serviceScope.ServiceProvider
};
}
public Task WaitAsync(TimeSpan timeout) {
public Task WaitAsync(TimeSpan timeout)
{
return Task.Delay(timeout, CancellationToken);
}
public void Dispose() {
if (_scope != null) {
public void Dispose()
{
if (_scope != null)
{
_scope.Dispose();
}
}
......
using System;
using System.Collections.Generic;
using System.Text;
namespace Cap.Consistency.Job
namespace DotNetCore.CAP.Job
{
public class RetryBehavior
{
......@@ -16,7 +14,8 @@ namespace Cap.Consistency.Job
private Func<int, int> _retryInThunk;
static RetryBehavior() {
static RetryBehavior()
{
DefaultRetryCount = 25;
DefaultRetryInThunk = retries =>
(int)Math.Round(Math.Pow(retries - 1, 4) + 15 + (_random.Next(30) * (retries)));
......@@ -26,7 +25,8 @@ namespace Cap.Consistency.Job
}
public RetryBehavior(bool retry)
: this(retry, DefaultRetryCount, DefaultRetryInThunk) {
: this(retry, DefaultRetryCount, DefaultRetryInThunk)
{
}
/// <summary>
......@@ -35,8 +35,10 @@ namespace Cap.Consistency.Job
/// <param name="retry">Whether to retry.</param>
/// <param name="retryCount">The maximum retry count.</param>
/// <param name="retryInThunk">The retry in function to use.</param>
public RetryBehavior(bool retry, int retryCount, Func<int, int> retryInThunk) {
if (retry) {
public RetryBehavior(bool retry, int retryCount, Func<int, int> retryInThunk)
{
if (retry)
{
if (retryCount < 0) throw new ArgumentOutOfRangeException(nameof(retryCount), "Can't be negative.");
}
......@@ -62,7 +64,8 @@ namespace Cap.Consistency.Job
/// </summary>
/// <param name="retries">The current retry count.</param>
/// <returns>The seconds to delay.</returns>
public int RetryIn(int retries) {
public int RetryIn(int retries)
{
return _retryInThunk(retries);
}
}
......
using System;
using System.Collections.Generic;
using System.Linq;
using Cap.Consistency.Job;
using DotNetCore.CAP.Job;
using Microsoft.Extensions.Logging;
namespace Cap.Consistency
namespace DotNetCore.CAP
{
internal static class LoggerExtensions
{
......@@ -17,7 +17,8 @@ namespace Cap.Consistency
private static Action<ILogger, string, double, Exception> _cronJobExecuted;
private static Action<ILogger, string, Exception> _cronJobFailed;
static LoggerExtensions() {
static LoggerExtensions()
{
_serverStarting = LoggerMessage.Define<int, int>(
LogLevel.Debug,
1,
......@@ -52,37 +53,41 @@ namespace Cap.Consistency
LogLevel.Warning,
4,
"Cron job '{jobName}' failed to execute.");
}
public static void ServerStarting(this ILogger logger, int machineProcessorCount, int processorCount) {
public static void ServerStarting(this ILogger logger, int machineProcessorCount, int processorCount)
{
_serverStarting(logger, machineProcessorCount, processorCount, null);
}
public static void ServerShuttingDown(this ILogger logger) {
public static void ServerShuttingDown(this ILogger logger)
{
_serverShuttingDown(logger, null);
}
public static void ExpectedOperationCanceledException(this ILogger logger, Exception ex) {
public static void ExpectedOperationCanceledException(this ILogger logger, Exception ex)
{
_expectedOperationCanceledException(logger, ex.Message, ex);
}
public static void CronJobsNotFound(this ILogger logger) {
public static void CronJobsNotFound(this ILogger logger)
{
_cronJobsNotFound(logger, null);
}
public static void CronJobsScheduling(this ILogger logger, IEnumerable<CronJob> jobs) {
public static void CronJobsScheduling(this ILogger logger, IEnumerable<CronJob> jobs)
{
_cronJobsScheduling(logger, jobs.Count(), null);
}
public static void CronJobExecuted(this ILogger logger, string name, double seconds) {
public static void CronJobExecuted(this ILogger logger, string name, double seconds)
{
_cronJobExecuted(logger, name, seconds, null);
}
public static void CronJobFailed(this ILogger logger, string name, Exception ex) {
public static void CronJobFailed(this ILogger logger, string name, Exception ex)
{
_cronJobFailed(logger, name, ex);
}
}
}
\ No newline at end of file
using System.Collections.Generic;
using System.Linq;
namespace Cap.Consistency
namespace DotNetCore.CAP
{
/// <summary>
/// Represents the result of an consistent message operation.
......@@ -37,9 +37,11 @@ namespace Cap.Consistency
/// </summary>
/// <param name="errors">An optional array of <see cref="OperateError"/>s which caused the operation to fail.</param>
/// <returns>An <see cref="OperateResult"/> indicating a failed operation, with a list of <paramref name="errors"/> if applicable.</returns>
public static OperateResult Failed(params OperateError[] errors) {
public static OperateResult Failed(params OperateError[] errors)
{
var result = new OperateResult { Succeeded = false };
if (errors != null) {
if (errors != null)
{
result._errors.AddRange(errors);
}
return result;
......@@ -53,7 +55,8 @@ namespace Cap.Consistency
/// If the operation was successful the ToString() will return "Succeeded" otherwise it returned
/// "Failed : " followed by a comma delimited list of error codes from its <see cref="Errors"/> collection, if any.
/// </remarks>
public override string ToString() {
public override string ToString()
{
return Succeeded ?
"Succeeded" :
string.Format("{0} : {1}", "Failed", string.Join(",", Errors.Select(x => x.Code).ToList()));
......
using System;
using System.Threading;
namespace Cap.Consistency
namespace DotNetCore.CAP
{
public class TopicContext
{
public TopicContext() {
public TopicContext()
{
}
public TopicContext(IServiceProvider provider, CancellationToken cancellationToken) {
public TopicContext(IServiceProvider provider, CancellationToken cancellationToken)
{
ServiceProvider = provider;
CancellationToken = cancellationToken;
}
public IServiceProvider ServiceProvider { get; set; }
public CancellationToken CancellationToken { get; }
......
using System.Reflection;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("Cap.Consistency.EntityFrameworkCore.Test")]
[assembly: AssemblyTrademark("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("7442c942-1ddc-40e4-8f1b-654e721eaa45")]
\ No newline at end of file
......@@ -2,7 +2,7 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
namespace Cap.Consistency.EntityFrameworkCore.Test
namespace DotNetCore.CAP.EntityFrameworkCore.Test
{
public static class DbUtil
{
......
//using System.Threading.Tasks;
//using Cap.Consistency.Infrastructure;
//using Cap.Consistency.Store;
//using DotNetCore.CAP.Infrastructure;
//using DotNetCore.CAP.Store;
//using Microsoft.AspNetCore.Builder.Internal;
//using Microsoft.AspNetCore.Testing.xunit;
//using Microsoft.EntityFrameworkCore;
//using Microsoft.Extensions.DependencyInjection;
//using Xunit;
//namespace Cap.Consistency.EntityFrameworkCore.Test
//namespace DotNetCore.CAP.EntityFrameworkCore.Test
//{
// public class DefaultPocoTest : IClassFixture<ScratchDatabaseFixture>
// {
......
......@@ -3,8 +3,8 @@
<PropertyGroup>
<TargetFramework>netcoreapp1.1</TargetFramework>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<AssemblyName>Cap.Consistency.EntityFrameworkCore.Test</AssemblyName>
<PackageId>Cap.Consistency.EntityFrameworkCore.Test</PackageId>
<AssemblyName>DotNetCore.CAP.EntityFrameworkCore.Test</AssemblyName>
<PackageId>DotNetCore.CAP.EntityFrameworkCore.Test</PackageId>
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
<PackageTargetFallback>$(PackageTargetFallback);dnxcore50;portable-net451+win8</PackageTargetFallback>
<RuntimeFrameworkVersion>1.1.1</RuntimeFrameworkVersion>
......@@ -18,8 +18,8 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Cap.Consistency\Cap.Consistency.csproj" />
<ProjectReference Include="..\..\src\Cap.Consistency.EntityFrameworkCore\Cap.Consistency.EntityFrameworkCore.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.EntityFrameworkCore\DotNetCore.CAP.EntityFrameworkCore.csproj" />
</ItemGroup>
<ItemGroup>
......
//using System;
//using System.Linq;
//using System.Threading.Tasks;
//using Cap.Consistency.Infrastructure;
//using Cap.Consistency.Store;
//using Cap.Consistency.Test;
//using DotNetCore.CAP.Infrastructure;
//using DotNetCore.CAP.Store;
//using DotNetCore.CAP.Test;
//using Microsoft.AspNetCore.Testing;
//using Microsoft.AspNetCore.Testing.xunit;
//using Microsoft.EntityFrameworkCore;
//using Microsoft.Extensions.DependencyInjection;
//using Xunit;
//namespace Cap.Consistency.EntityFrameworkCore.Test
//namespace DotNetCore.CAP.EntityFrameworkCore.Test
//{
// public class MessageStoreTest : MessageManagerTestBase<ConsistencyMessage>, IClassFixture<ScratchDatabaseFixture>
// {
......
//using System;
//using Cap.Consistency.Infrastructure;
//using Cap.Consistency.Store;
//using Cap.Consistency.Test;
//using DotNetCore.CAP.Infrastructure;
//using DotNetCore.CAP.Store;
//using DotNetCore.CAP.Test;
//using Microsoft.AspNetCore.Testing;
//using Microsoft.Extensions.DependencyInjection;
//using Xunit;
//namespace Cap.Consistency.EntityFrameworkCore.Test
//namespace DotNetCore.CAP.EntityFrameworkCore.Test
//{
// public class MessageStoreWithGenericsTest : MessageManagerTestBase<MessageWithGenerics, string>, IClassFixture<ScratchDatabaseFixture>
// {
......
......@@ -6,7 +6,7 @@ using System.Runtime.InteropServices;
// associated with an assembly.
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("Cap.Consistency.EntityFrameworkCore")]
[assembly: AssemblyProduct("DotNetCore.CAP.EntityFrameworkCore.Test")]
[assembly: AssemblyTrademark("")]
// Setting ComVisible to false makes the types in this assembly not visible
......@@ -15,4 +15,4 @@ using System.Runtime.InteropServices;
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("96111249-c4c3-4dc9-a887-32d583723ab1")]
\ No newline at end of file
[assembly: Guid("7442c942-1ddc-40e4-8f1b-654e721eaa45")]
\ No newline at end of file
using System;
using Microsoft.EntityFrameworkCore.Internal;
namespace Cap.Consistency.EntityFrameworkCore.Test
namespace DotNetCore.CAP.EntityFrameworkCore.Test
{
public class ScratchDatabaseFixture : IDisposable
{
......
......@@ -4,7 +4,7 @@ using System.Data.SqlClient;
using System.IO;
using System.Threading;
namespace Cap.Consistency.EntityFrameworkCore.Test
namespace DotNetCore.CAP.EntityFrameworkCore.Test
{
public class SqlServerTestStore : IDisposable
{
......
using System.IO;
using Microsoft.Extensions.Configuration;
namespace Cap.Consistency.EntityFrameworkCore.Test
namespace DotNetCore.CAP.EntityFrameworkCore.Test
{
public class TestEnvironment
{
......
//using System;
//using System.Threading;
//using System.Threading.Tasks;
//using Cap.Consistency.Infrastructure;
//using Cap.Consistency.Store;
//using DotNetCore.CAP.Infrastructure;
//using DotNetCore.CAP.Store;
//using Microsoft.Extensions.DependencyInjection;
//using Xunit;
//namespace Cap.Consistency.Test
//namespace DotNetCore.CAP.Test
//{
// public class ConsistencyBuilderTest
// {
......
//using System;
//using System.Threading;
//using System.Threading.Tasks;
//using Cap.Consistency.Infrastructure;
//using Cap.Consistency.Store;
//using DotNetCore.CAP.Infrastructure;
//using DotNetCore.CAP.Store;
//using Microsoft.AspNetCore.Http;
//using Microsoft.Extensions.DependencyInjection;
//using Microsoft.Extensions.Logging;
//using Moq;
//using Xunit;
//namespace Cap.Consistency.Test
//namespace DotNetCore.CAP.Test
//{
// public class ConsistencyMessageManagerTest
// {
......
namespace Cap.Consistency.Test
namespace CDotNetCore.CAPTest
{
public class ConsistencyOptionsTest
{
......
......@@ -3,8 +3,8 @@
<PropertyGroup>
<TargetFramework>netcoreapp1.1</TargetFramework>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<AssemblyName>Cap.Consistency.Test</AssemblyName>
<PackageId>Cap.Consistency.Test</PackageId>
<AssemblyName>DotNetCore.CAP.Test</AssemblyName>
<PackageId>DotNetCore.CAP.Test</PackageId>
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
<PackageTargetFallback>$(PackageTargetFallback);dnxcore50;portable-net451+win8</PackageTargetFallback>
<RuntimeFrameworkVersion>1.1.1</RuntimeFrameworkVersion>
......@@ -15,7 +15,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Cap.Consistency\Cap.Consistency.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
<ItemGroup>
......
//using System;
//using System.Threading;
//using System.Threading.Tasks;
//using Cap.Consistency.Infrastructure;
//using Cap.Consistency.Store;
//using DotNetCore.CAP.Infrastructure;
//using DotNetCore.CAP.Store;
//namespace Cap.Consistency.Test
//namespace DotNetCore.CAP.Test
//{
// public class NoopMessageStore : IConsistencyMessageStore
// {
......
using System.Linq;
using Xunit;
namespace Cap.Consistency.Test
namespace DotNetCore.CAP.Test
{
public class OperateResultTest
{
......
......@@ -4,15 +4,15 @@
//using System.Linq.Expressions;
//using System.Security.Claims;
//using System.Threading.Tasks;
//using Cap.Consistency.Infrastructure;
//using Cap.Consistency.Store;
//using DotNetCore.CAP.Infrastructure;
//using DotNetCore.CAP.Store;
//using Microsoft.AspNetCore.Builder;
//using Microsoft.AspNetCore.Http;
//using Microsoft.Extensions.DependencyInjection;
//using Microsoft.Extensions.Logging;
//using Xunit;
//namespace Cap.Consistency.Test
//namespace DotNetCore.CAP.Test
//{
// public abstract class MessageManagerTestBase<TMessage> : MessageManagerTestBase<TMessage, string>
// where TMessage : ConsistencyMessage
......
using System;
using Cap.Consistency.Infrastructure;
using DotNetCore.CAP.Infrastructure;
namespace Cap.Consistency.Test
namespace DotNetCore.CAP.Test
{
}
......@@ -2,7 +2,7 @@ using System;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
namespace Cap.Consistency.Test
namespace DotNetCore.CAP.Test
{
public interface ITestLogger
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment