Commit 8c69e4d2 authored by yangxiaodong's avatar yangxiaodong

refactor

parent e9c4e926
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Producer;
using Cap.Consistency.Store;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Cap.Consistency.Kafka
{
public class KafkaProducerClient : IProducerClient
{
private readonly ConsistencyOptions _options;
private readonly ILogger _logger;
public KafkaProducerClient(IOptions<ConsistencyOptions> options, ILoggerFactory loggerFactory, IConsistencyMessageStore store) {
_options = options.Value;
_logger = loggerFactory.CreateLogger(nameof(KafkaProducerClient));
}
public Task SendAsync(string topic, string content) {
var config = new Dictionary<string, object> { { "bootstrap.servers", _options.BrokerUrlList } };
try {
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8))) {
var task = producer.ProduceAsync(topic, null, content);
task.ContinueWith(g => {
_logger.LogInformation($"publish message to topic:{topic}\r\n -------content:{content}\r\n ");
});
//producer.Flush(1000);
return Task.CompletedTask;
}
}
catch (Exception e) {
_logger.LogError(new EventId(1), e, $"publish message to [topic:{topic}] ,content:{content}");
return Task.CompletedTask;
}
}
}
}
\ No newline at end of file
...@@ -19,15 +19,4 @@ namespace Cap.Consistency.Infrastructure ...@@ -19,15 +19,4 @@ namespace Cap.Consistency.Infrastructure
public string Value { get; set; } public string Value { get; set; }
} }
public class KafkaDeliverMessage : DeliverMessage
{
public int Partition { get; set; }
public long Offset { get; set; }
public string MessageId { get; set; }
}
} }
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Cap.Consistency.Producer
{
public interface IProducerClient
{
Task SendAsync(string topic, string content);
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace Cap.Consistency.Store
{
/// <summary>
/// Provides the APIs for managing message in a persistence store.
/// </summary>
/// <typeparam name="ConsistencyMessage">The type encapsulating a message.</typeparam>
public class ConsistencyMessageManager: IDisposable
{
private bool _disposed;
private readonly HttpContext _context;
private CancellationToken CancellationToken => _context?.RequestAborted ?? CancellationToken.None;
/// <summary>
/// Constructs a new instance of <see cref="ConsistencyMessageManager{ConsistencyMessage}"/>.
/// </summary>
/// <param name="store">The persistence store the manager will operate over.</param>
/// <param name="services">The <see cref="IServiceProvider"/> used to resolve services.</param>
/// <param name="logger">The logger used to log messages, warnings and errors.</param>
public ConsistencyMessageManager(IConsistencyMessageStore store,
IServiceProvider services,
ILogger<ConsistencyMessageManager> logger) {
if (store == null) {
throw new ArgumentNullException(nameof(store));
}
Store = store;
Logger = logger;
if (services != null) {
_context = services.GetService<IHttpContextAccessor>()?.HttpContext;
}
}
/// <summary>
/// Gets or sets the persistence store the manager operates over.
/// </summary>
/// <value>The persistence store the manager operates over.</value>
protected internal IConsistencyMessageStore Store { get; set; }
/// <summary>
/// Gets the <see cref="ILogger"/> used to log messages from the manager.
/// </summary>
/// <value>
/// The <see cref="ILogger"/> used to log messages from the manager.
/// </value>
protected internal virtual ILogger Logger { get; set; }
/// <summary>
/// Creates the specified <paramref name="message"/> in the backing store.
/// </summary>
/// <param name="message">The message to create.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the <see cref="OperateResult"/>
/// of the operation.
/// </returns>
public virtual Task<OperateResult> CreateAsync(ConsistencyMessage message) {
ThrowIfDisposed();
//todo: validation message fileds is correct
return Store.CreateAsync(message, CancellationToken);
}
/// <summary>
/// Updates the specified <paramref name="message"/> in the backing store.
/// </summary>
/// <param name="message">The message to update.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the <see cref="OperateResult"/>
/// of the operation.
/// </returns>
public virtual Task<OperateResult> UpdateAsync(ConsistencyMessage message) {
ThrowIfDisposed();
//todo: validation message fileds is correct
return Store.UpdateAsync(message, CancellationToken);
}
/// <summary>
/// Deletes the specified <paramref name="message"/> in the backing store.
/// </summary>
/// <param name="message">The message to delete.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the <see cref="OperateResult"/>
/// of the operation.
/// </returns>
public virtual Task<OperateResult> DeleteAsync(ConsistencyMessage message) {
ThrowIfDisposed();
if (message == null) {
throw new ArgumentNullException(nameof(message));
}
return Store.DeleteAsync(message, CancellationToken);
}
/// <summary>
/// Finds and returns a message, if any, who has the specified <paramref name="messageId"/>.
/// </summary>
/// <param name="messageId">The message ID to search for.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the user matching the specified <paramref name="messageId"/> if it exists.
/// </returns>
public virtual Task<ConsistencyMessage> FindByIdAsync(string messageId) {
ThrowIfDisposed();
return Store.FindByIdAsync(messageId, CancellationToken);
}
/// <summary>
/// Gets the message identifier for the specified <paramref name="message"/>.
/// </summary>
/// <param name="message">The message whose identifier should be retrieved.</param>
/// <returns>The <see cref="Task"/> that represents the asynchronous operation, containing the identifier for the specified <paramref name="message"/>.</returns>
public virtual async Task<string> GeConsistencyMessageIdAsync(ConsistencyMessage message) {
ThrowIfDisposed();
return await Store.GeConsistencyMessageIdAsync(message, CancellationToken);
}
public void Dispose() {
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Releases the unmanaged resources used by the message manager and optionally releases the managed resources.
/// </summary>
/// <param name="disposing">true to release both managed and unmanaged resources; false to release only unmanaged resources.</param>
protected virtual void Dispose(bool disposing) {
if (disposing && !_disposed) {
_disposed = true;
}
}
protected void ThrowIfDisposed() {
if (_disposed) {
throw new ObjectDisposedException(GetType().Name);
}
}
}
}
\ No newline at end of file
using System;
using System.Threading;
using System.Threading.Tasks;
using Cap.Consistency.Infrastructure;
namespace Cap.Consistency.Store
{
/// <summary>
/// Provides an abstraction for a store which manages consistent message.
/// </summary>
/// <typeparam name="ConsistencyMessage"></typeparam>
public interface IConsistencyMessageStore
{
/// <summary>
/// Finds and returns a message, if any, who has the specified <paramref name="messageId"/>.
/// </summary>
/// <param name="messageId">The message ID to search for.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>
/// The <see cref="Task"/> that represents the asynchronous operation, containing the message matching the specified <paramref name="messageId"/> if it exists.
/// </returns>
Task<ConsistencyMessage> FindByIdAsync(string messageId, CancellationToken cancellationToken);
/// <summary>
/// Creates a new message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to create in the store.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that represents the <see cref="OperateResult"/> of the asynchronous query.</returns>
Task<OperateResult> CreateAsync(ConsistencyMessage message, CancellationToken cancellationToken);
/// <summary>
/// Updates a message in a store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to update in the store.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that represents the <see cref="OperateResult"/> of the asynchronous query.</returns>
Task<OperateResult> UpdateAsync(ConsistencyMessage message, CancellationToken cancellationToken);
/// <summary>
/// Deletes a message from the store as an asynchronous operation.
/// </summary>
/// <param name="message">The message to delete in the store.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that represents the <see cref="OperateResult"/> of the asynchronous query.</returns>
Task<OperateResult> DeleteAsync(ConsistencyMessage message, CancellationToken cancellationToken);
/// <summary>
/// Gets the ID for a message from the store as an asynchronous operation.
/// </summary>
/// <param name="message">The message whose ID should be returned.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used to propagate notifications that the operation should be canceled.</param>
/// <returns>A <see cref="Task{TResult}"/> that contains the ID of the message.</returns>
Task<string> GeConsistencyMessageIdAsync(ConsistencyMessage message, CancellationToken cancellationToken);
}
}
\ No newline at end of file
using System.Collections.Generic;
using System.Linq;
namespace Cap.Consistency.Store
{
/// <summary>
/// Represents the result of an consistent message operation.
/// </summary>
public class OperateResult
{
// ReSharper disable once InconsistentNaming
private static readonly OperateResult _success = new OperateResult { Succeeded = true };
// ReSharper disable once FieldCanBeMadeReadOnly.Local
private List<OperateError> _errors = new List<OperateError>();
/// <summary>
/// Flag indicating whether if the operation succeeded or not.
/// </summary>
public bool Succeeded { get; set; }
/// <summary>
/// An <see cref="IEnumerable{T}"/> of <see cref="OperateError"/>s containing an errors
/// that occurred during the operation.
/// </summary>
/// <value>An <see cref="IEnumerable{T}"/> of <see cref="OperateError"/>s.</value>
public IEnumerable<OperateError> Errors => _errors;
/// <summary>
/// Returns an <see cref="OperateResult"/> indicating a successful identity operation.
/// </summary>
/// <returns>An <see cref="OperateResult"/> indicating a successful operation.</returns>
public static OperateResult Success => _success;
/// <summary>
/// Creates an <see cref="OperateResult"/> indicating a failed operation, with a list of <paramref name="errors"/> if applicable.
/// </summary>
/// <param name="errors">An optional array of <see cref="OperateError"/>s which caused the operation to fail.</param>
/// <returns>An <see cref="OperateResult"/> indicating a failed operation, with a list of <paramref name="errors"/> if applicable.</returns>
public static OperateResult Failed(params OperateError[] errors) {
var result = new OperateResult { Succeeded = false };
if (errors != null) {
result._errors.AddRange(errors);
}
return result;
}
/// <summary>
/// Converts the value of the current <see cref="OperateResult"/> object to its equivalent string representation.
/// </summary>
/// <returns>A string representation of the current <see cref="OperateResult"/> object.</returns>
/// <remarks>
/// If the operation was successful the ToString() will return "Succeeded" otherwise it returned
/// "Failed : " followed by a comma delimited list of error codes from its <see cref="Errors"/> collection, if any.
/// </remarks>
public override string ToString() {
return Succeeded ?
"Succeeded" :
string.Format("{0} : {1}", "Failed", string.Join(",", Errors.Select(x => x.Code).ToList()));
}
}
/// <summary>
/// Encapsulates an error from the operate subsystem.
/// </summary>
public class OperateError
{
/// <summary>
/// Gets or sets ths code for this error.
/// </summary>
public string Code { get; set; }
/// <summary>
/// Gets or sets the description for this error.
/// </summary>
public string Description { get; set; }
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment