Commit cc0c5363 authored by yangxiaodong's avatar yangxiaodong

add server

parent 612c63d1
......@@ -23,6 +23,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Shared", "Shared", "{9E5A7F
EndProject
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Cap.Consistency.Server", "src\Cap.Consistency.Server\Cap.Consistency.Server.xproj", "{55CF2C48-D390-40CF-8AD9-FA39F90E9217}"
EndProject
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Cap.Consistency.EntityFrameworkCore", "src\Cap.Consistency.EntityFrameworkCore\Cap.Consistency.EntityFrameworkCore.xproj", "{96111249-C4C3-4DC9-A887-32D583723AB1}"
EndProject
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Cap.Consistency.EntityFrameworkCore.Test", "test\Cap.Consistency.EntityFrameworkCore.Test\Cap.Consistency.EntityFrameworkCore.Test.xproj", "{7442C942-1DDC-40E4-8F1B-654E721EAA45}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
......@@ -41,6 +45,14 @@ Global
{55CF2C48-D390-40CF-8AD9-FA39F90E9217}.Debug|Any CPU.Build.0 = Debug|Any CPU
{55CF2C48-D390-40CF-8AD9-FA39F90E9217}.Release|Any CPU.ActiveCfg = Release|Any CPU
{55CF2C48-D390-40CF-8AD9-FA39F90E9217}.Release|Any CPU.Build.0 = Release|Any CPU
{96111249-C4C3-4DC9-A887-32D583723AB1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{96111249-C4C3-4DC9-A887-32D583723AB1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{96111249-C4C3-4DC9-A887-32D583723AB1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{96111249-C4C3-4DC9-A887-32D583723AB1}.Release|Any CPU.Build.0 = Release|Any CPU
{7442C942-1DDC-40E4-8F1B-654E721EAA45}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7442C942-1DDC-40E4-8F1B-654E721EAA45}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7442C942-1DDC-40E4-8F1B-654E721EAA45}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7442C942-1DDC-40E4-8F1B-654E721EAA45}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......@@ -50,5 +62,7 @@ Global
{3A444CF8-1611-407F-8D32-5D0CDC3DD49D} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{9E5A7F49-8E31-4A71-90CC-1DA9AEDA99EE} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{55CF2C48-D390-40CF-8AD9-FA39F90E9217} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{96111249-C4C3-4DC9-A887-32D583723AB1} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{7442C942-1DDC-40E4-8F1B-654E721EAA45} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
EndGlobalSection
EndGlobal
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<VisualStudioVersion Condition="'$(VisualStudioVersion)' == ''">14.0</VisualStudioVersion>
<VSToolsPath Condition="'$(VSToolsPath)' == ''">$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)</VSToolsPath>
</PropertyGroup>
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.Props" Condition="'$(VSToolsPath)' != ''" />
<PropertyGroup Label="Globals">
<ProjectGuid>96111249-c4c3-4dc9-a887-32d583723ab1</ProjectGuid>
<RootNamespace>Cap.Consistency.EntityFrameworkCore</RootNamespace>
<BaseIntermediateOutputPath Condition="'$(BaseIntermediateOutputPath)'=='' ">.\obj</BaseIntermediateOutputPath>
<OutputPath Condition="'$(OutputPath)'=='' ">.\bin\</OutputPath>
<TargetFrameworkVersion>v4.6</TargetFrameworkVersion>
</PropertyGroup>
<PropertyGroup>
<SchemaVersion>2.0</SchemaVersion>
</PropertyGroup>
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.targets" Condition="'$(VSToolsPath)' != ''" />
</Project>
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.DependencyInjection;
using Cap.Consistency;
using Cap.Consistency.EntityFrameworkCore;
namespace Cap.Consistency.EntityFrameworkCore
{
/// <summary>
/// Contains extension methods to <see cref="ConsistencyBuilder"/> for adding entity framework stores.
/// </summary>
public static class ConsistencyEntityFrameworkBuilderExtensions
{
/// <summary>
/// Adds an Entity Framework implementation of identity information stores.
/// </summary>
/// <typeparam name="TContext">The Entity Framework database context to use.</typeparam>
/// <param name="builder">The <see cref="ConsistencyBuilder"/> instance this method extends.</param>
/// <returns>The <see cref="ConsistencyBuilder"/> instance this method extends.</returns>
public static ConsistencyBuilder AddEntityFrameworkStores<TContext>(this ConsistencyBuilder builder)
where TContext : DbContext {
builder.Services.TryAdd(GetDefaultServices(builder.MessageType, typeof(TContext)));
return builder;
}
/// <summary>
/// Adds an Entity Framework implementation of identity information stores.
/// </summary>
/// <typeparam name="TContext">The Entity Framework database context to use.</typeparam>
/// <typeparam name="TKey">The type of the primary key used for the users and roles.</typeparam>
/// <param name="builder">The <see cref="ConsistencyBuilder"/> instance this method extends.</param>
/// <returns>The <see cref="ConsistencyBuilder"/> instance this method extends.</returns>
public static ConsistencyBuilder AddEntityFrameworkStores<TContext, TKey>(this ConsistencyBuilder builder)
where TContext : DbContext
where TKey : IEquatable<TKey> {
builder.Services.TryAdd(GetDefaultServices(builder.MessageType, typeof(TContext), typeof(TKey)));
return builder;
}
private static IServiceCollection GetDefaultServices(Type messageType, Type contextType, Type keyType = null) {
Type messageStoreType;
keyType = keyType ?? typeof(string);
messageStoreType = typeof(IConsistencyMessageStore<>).MakeGenericType(messageType, contextType, keyType);
var services = new ServiceCollection();
services.AddScoped(
typeof(IConsistencyMessageStore<>).MakeGenericType(messageStoreType),
messageStoreType);
return services;
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Cap.Consistency.EntityFrameworkCore
{
public class ConsistencyMessage : ConsistencyMessage<string>
{
public ConsistencyMessage() {
Id = Guid.NewGuid().ToString();
}
}
public enum MessageStatus
{
Deleted = 0,
WaitForSend = 1,
RollbackSuccessed = 3,
RollbackFailed = 4
}
public class ConsistencyMessage<TKey> where TKey : IEquatable<TKey>
{
public virtual TKey Id { get; set; }
public virtual DateTime SendTime { get; set; }
public string Payload { get; set; }
public MessageStatus Status { get; set; }
public virtual DateTime? UpdateTime { get; set; }
}
}
using Cap.Consistency;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading;
using Microsoft.EntityFrameworkCore;
using System.ComponentModel;
namespace Cap.Consistency.EntityFrameworkCore
{
public class ConsistencyMessageStore : ConsistencyMessageStore<ConsistencyMessage, DbContext, string>
{
public ConsistencyMessageStore(DbContext context) : base(context) { }
}
public class ConsistencyMessageStore<TMessage> : ConsistencyMessageStore<TMessage, DbContext, string>
where TMessage : ConsistencyMessage<string>
{
public ConsistencyMessageStore(DbContext context) : base(context) { }
}
public class ConsistencyMessageStore<TMessage, TContext> : ConsistencyMessageStore<TMessage, TContext, string>
where TMessage : ConsistencyMessage<string>
where TContext : DbContext
{
public ConsistencyMessageStore(TContext context) : base(context) { }
}
public abstract class ConsistencyMessageStore<TMessage, TContext, TKey> : IConsistencyMessageStore<TMessage>
where TMessage : ConsistencyMessage<TKey>
where TContext : DbContext
where TKey : IEquatable<TKey>
{
private bool _disposed;
public ConsistencyMessageStore(TContext context) {
if (context == null) {
throw new ArgumentNullException(nameof(context));
}
Context = context;
}
public TContext Context { get; private set; }
private DbSet<TMessage> MessageSet { get { return Context.Set<TMessage>(); } }
/// <summary>
/// Creates the specified <paramref name="user"/> in the consistency message store.
/// </summary>
/// <param name="message">The message to create.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>The <see cref="Task"/> that represents the asynchronous operation, containing the <see cref="OperateResult"/> of the creation operation.</returns>
public async virtual Task<OperateResult> CreateAsync(TMessage message, CancellationToken cancellationToken) {
cancellationToken.ThrowIfCancellationRequested();
ThrowIfDisposed();
if (message == null) {
throw new ArgumentNullException(nameof(message));
}
Context.Add(message);
await SaveChanges(cancellationToken);
return OperateResult.Success;
}
/// <summary>
/// Deletes the specified <paramref name="message"/> from the consistency message store.
/// </summary>
/// <param name="message">The message to delete.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>The <see cref="Task"/> that represents the asynchronous operation, containing the <see cref="OperateResult"/> of the update operation.</returns>
public async virtual Task<OperateResult> DeleteAsync(TMessage message, CancellationToken cancellationToken) {
cancellationToken.ThrowIfCancellationRequested();
ThrowIfDisposed();
if (message == null) {
throw new ArgumentNullException(nameof(message));
}
Context.Remove(message);
try {
await SaveChanges(cancellationToken);
}
catch (DbUpdateConcurrencyException ex) {
return OperateResult.Failed(new OperateError() { Code = "DbUpdateConcurrencyException", Description = ex.Message });
}
return OperateResult.Success;
}
/// <summary>
/// Finds and returns a message, if any, who has the specified <paramref name="messageId"/>.
/// </summary>
/// <param name="messageId">The message ID to search for.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the user matching the specified <paramref name="messageId"/> if it exists.
/// </returns>
public virtual Task<TMessage> FindByIdAsync(string messageId, CancellationToken cancellationToken) {
cancellationToken.ThrowIfCancellationRequested();
ThrowIfDisposed();
var id = ConvertIdFromString(messageId);
return MessageSet.FindAsync(new object[] { id }, cancellationToken);
}
/// <summary>
/// Converts the provided <paramref name="id"/> to a strongly typed key object.
/// </summary>
/// <param name="id">The id to convert.</param>
/// <returns>An instance of <typeparamref name="TKey"/> representing the provided <paramref name="id"/>.</returns>
public virtual TKey ConvertIdFromString(string id) {
if (id == null) {
return default(TKey);
}
return (TKey)TypeDescriptor.GetConverter(typeof(TKey)).ConvertFromInvariantString(id);
}
/// <summary>
/// Converts the provided <paramref name="id"/> to its string representation.
/// </summary>
/// <param name="id">The id to convert.</param>
/// <returns>An <see cref="string"/> representation of the provided <paramref name="id"/>.</returns>
public virtual string ConvertIdToString(TKey id) {
if (object.Equals(id, default(TKey))) {
return null;
}
return id.ToString();
}
public Task<string> GetMessageIdAsync(TMessage message, CancellationToken cancellationToken) {
cancellationToken.ThrowIfCancellationRequested();
ThrowIfDisposed();
if (message == null) {
throw new ArgumentNullException(nameof(message));
}
return Task.FromResult(ConvertIdToString(message.Id));
}
public async virtual Task<OperateResult> UpdateAsync(TMessage message, CancellationToken cancellationToken) {
cancellationToken.ThrowIfCancellationRequested();
ThrowIfDisposed();
if (message == null) {
throw new ArgumentNullException(nameof(message));
}
Context.Attach(message);
message.UpdateTime = DateTime.Now;
Context.Update(message);
try {
await SaveChanges(cancellationToken);
}
catch (DbUpdateConcurrencyException ex) {
return OperateResult.Failed(new OperateError() { Code = "DbUpdateConcurrencyException", Description = ex.Message });
}
return OperateResult.Success;
}
/// <summary>
/// Gets or sets a flag indicating if changes should be persisted after CreateAsync, UpdateAsync and DeleteAsync are called.
/// </summary>
/// <value>
/// True if changes should be automatically persisted, otherwise false.
/// </value>
public bool AutoSaveChanges { get; set; } = true;
/// <summary>Saves the current store.</summary>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>The <see cref="Task"/> that represents the asynchronous operation.</returns>
protected Task SaveChanges(CancellationToken cancellationToken) {
return AutoSaveChanges ? Context.SaveChangesAsync(cancellationToken) : Task.CompletedTask;
}
/// <summary>
/// Throws if this class has been disposed.
/// </summary>
protected void ThrowIfDisposed() {
if (_disposed) {
throw new ObjectDisposedException(GetType().Name);
}
}
/// <summary>
/// Dispose the store
/// </summary>
public void Dispose() {
_disposed = true;
}
}
}
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("Cap.Consistency.EntityFrameworkCore")]
[assembly: AssemblyTrademark("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("96111249-c4c3-4dc9-a887-32d583723ab1")]
{
"version": "1.0.0-*",
"dependencies": {
"Cap.Consistency": "1.0.0-*",
"Microsoft.EntityFrameworkCore": "1.1.0-*",
"NETStandard.Library": "1.6.1",
"System.ComponentModel.TypeConverter": "4.3.0"
},
"frameworks": {
"netstandard1.6": {
"imports": "dnxcore50"
}
}
}
......@@ -4,13 +4,12 @@ using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.AspNetCore.Hosting.Server.Features;
using System.Reflection;
using Cap.Consistency.Server.Internal.Infrastructure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.AspNetCore.Http.Features;
namespace Cap.Consistency.Server
{
......@@ -19,7 +18,6 @@ namespace Cap.Consistency.Server
private Stack<IDisposable> _disposables;
private readonly IApplicationLifetime _applicationLifetime;
private readonly ILogger _logger;
private readonly IServerAddressesFeature _serverAddresses;
private readonly IConsumer _consumer;
public ConsistencyServer(IOptions<ConsistencyServerOptions> options, IApplicationLifetime applicationLifetime, ILoggerFactory loggerFactory) {
......@@ -39,15 +37,12 @@ namespace Cap.Consistency.Server
_applicationLifetime = applicationLifetime;
_logger = loggerFactory.CreateLogger(typeof(ConsistencyServer).GetTypeInfo().Namespace);
_consumer = Options.ApplicationServices.GetService<IConsumer>();
Features = new FeatureCollection();
_serverAddresses = new ServerAddressesFeature();
Features.Set(_serverAddresses);
}
public IFeatureCollection Features { get; }
public ConsistencyServerOptions Options { get; }
public IFeatureCollection Features { get; set; }
public void Start<TContext>(IHttpApplication<TContext> application) {
if (_disposables != null) {
// The server has already started and/or has not been cleaned up yet
......@@ -56,6 +51,8 @@ namespace Cap.Consistency.Server
_disposables = new Stack<IDisposable>();
var trace = new ConsistencyTrace(_logger);
_consumer.Log = trace;
_disposables.Push(_consumer);
var threadCount = Options.ThreadCount;
......@@ -66,7 +63,12 @@ namespace Cap.Consistency.Server
"ThreadCount must be positive.");
}
_consumer.Start();
try {
_consumer.Start(threadCount);
}
catch (Exception ex) {
throw ex;
}
}
public void Dispose() {
......
using System;
using Microsoft.Extensions.Logging;
namespace Cap.Consistency.Server.Internal.Infrastructure
namespace Cap.Consistency.Server
{
public interface IConsistencyTrace : ILogger
{
......
......@@ -15,7 +15,7 @@ namespace Cap.Consistency.Server
void Stop();
ILogger Log { get; set; }
IConsistencyTrace Log { get; set; }
ConsistencyServerOptions ServerOptions { get; set; }
......
......@@ -6,7 +6,7 @@ namespace Cap.Consistency.Server.Internal.Infrastructure
/// <summary>
/// Summary description for KestrelTrace
/// </summary>
public class ConsistencyTrace
public class ConsistencyTrace : IConsistencyTrace
{
private static readonly Action<ILogger, string, Exception> _connectionStart;
private static readonly Action<ILogger, string, Exception> _connectionStop;
......
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<VisualStudioVersion Condition="'$(VisualStudioVersion)' == ''">14.0</VisualStudioVersion>
<VSToolsPath Condition="'$(VSToolsPath)' == ''">$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)</VSToolsPath>
</PropertyGroup>
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.Props" Condition="'$(VSToolsPath)' != ''" />
<PropertyGroup Label="Globals">
<ProjectGuid>7442c942-1ddc-40e4-8f1b-654e721eaa45</ProjectGuid>
<RootNamespace>Cap.Consistency.EntityFrameworkCore.Test</RootNamespace>
<BaseIntermediateOutputPath Condition="'$(BaseIntermediateOutputPath)'=='' ">.\obj</BaseIntermediateOutputPath>
<OutputPath Condition="'$(OutputPath)'=='' ">.\bin\</OutputPath>
<TargetFrameworkVersion>v4.6</TargetFrameworkVersion>
</PropertyGroup>
<PropertyGroup>
<SchemaVersion>2.0</SchemaVersion>
</PropertyGroup>
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.targets" Condition="'$(VSToolsPath)' != ''" />
</Project>
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Cap.Consistency.EntityFrameworkCore.Test
{
public class Class1
{
public Class1()
{
}
}
}
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("Cap.Consistency.EntityFrameworkCore.Test")]
[assembly: AssemblyTrademark("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("7442c942-1ddc-40e4-8f1b-654e721eaa45")]
{
"version": "1.0.0-*",
"buildOptions": {
"warningsAsErrors": true,
"compile": {
"include": "../Shared/TestConsistencyMessage.cs"
}
},
"dependencies": {
"xunit": "2.2.0-*",
"Microsoft.AspNetCore.Http": "1.1.0-*",
"Microsoft.Extensions.DependencyInjection": "1.1.0-*",
"dotnet-test-xunit": "2.2.0-*",
"Cap.Consistency": "1.0.0-*",
"Moq": "4.6.36-*",
"Microsoft.Extensions.Logging": "1.1.0-*",
"Cap.Consistency.EntityFrameworkCore": "1.0.0-*"
},
"frameworks": {
"netcoreapp1.1": {
"imports": [ "dnxcore50", "portable-net451+win8" ],
"dependencies": {
"Microsoft.NETCore.App": {
"version": "1.1.0",
"type": "platform"
}
}
}
},
"testRunner": "xunit"
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment