Commit ac0fc62a authored by yangxiaodong's avatar yangxiaodong

add feature of #22.

parent a9e0743f
...@@ -11,7 +11,7 @@ using Microsoft.Extensions.Logging; ...@@ -11,7 +11,7 @@ using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.SqlServer namespace DotNetCore.CAP.SqlServer
{ {
public class CapPublisher : CapPublisherBase public class CapPublisher : CapPublisherBase, ICallbackPublisher
{ {
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly SqlServerOptions _options; private readonly SqlServerOptions _options;
...@@ -61,6 +61,14 @@ namespace DotNetCore.CAP.SqlServer ...@@ -61,6 +61,14 @@ namespace DotNetCore.CAP.SqlServer
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString()); _logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
} }
public Task PublishAsync(string name, object contentObj)
{
using (var conn = new SqlConnection(_options.ConnectionString))
{
return conn.ExecuteAsync(PrepareSql(), contentObj);
}
}
#region private methods #region private methods
private string PrepareSql() private string PrepareSql()
...@@ -68,7 +76,6 @@ namespace DotNetCore.CAP.SqlServer ...@@ -68,7 +76,6 @@ namespace DotNetCore.CAP.SqlServer
return $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; return $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
} }
#endregion private methods #endregion private methods
} }
} }
\ No newline at end of file
...@@ -7,7 +7,7 @@ using DotNetCore.CAP.Processor; ...@@ -7,7 +7,7 @@ using DotNetCore.CAP.Processor;
namespace DotNetCore.CAP.Abstractions namespace DotNetCore.CAP.Abstractions
{ {
public abstract class CapPublisherBase : ICapPublisher public abstract class CapPublisherBase : ICapPublisher, IDisposable
{ {
protected IDbConnection DbConnection { get; set; } protected IDbConnection DbConnection { get; set; }
protected IDbTransaction DbTranasaction { get; set; } protected IDbTransaction DbTranasaction { get; set; }
...@@ -16,44 +16,46 @@ namespace DotNetCore.CAP.Abstractions ...@@ -16,44 +16,46 @@ namespace DotNetCore.CAP.Abstractions
protected bool IsUsingEF { get; set; } protected bool IsUsingEF { get; set; }
protected IServiceProvider ServiceProvider { get; set; } protected IServiceProvider ServiceProvider { get; set; }
public void Publish<T>(string name, T contentObj) public void Publish<T>(string name, T contentObj, string callbackName = null)
{ {
CheckIsUsingEF(name); CheckIsUsingEF(name);
PrepareConnectionForEF(); PrepareConnectionForEF();
var content = Serialize(contentObj); var content = Serialize(contentObj, callbackName);
PublishWithTrans(name, content, DbConnection, DbTranasaction); PublishWithTrans(name, content);
} }
public Task PublishAsync<T>(string name, T contentObj) public Task PublishAsync<T>(string name, T contentObj, string callbackName = null)
{ {
CheckIsUsingEF(name); CheckIsUsingEF(name);
PrepareConnectionForEF(); PrepareConnectionForEF();
var content = Serialize(contentObj); var content = Serialize(contentObj, callbackName);
return PublishWithTransAsync(name, content, DbConnection, DbTranasaction); return PublishWithTransAsync(name, content);
} }
public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) public void Publish<T>(string name, T contentObj, IDbConnection dbConnection,
string callbackName = null, IDbTransaction dbTransaction = null)
{ {
CheckIsAdoNet(name); CheckIsAdoNet(name);
PrepareConnectionForAdo(dbConnection, ref dbTransaction); PrepareConnectionForAdo(dbConnection, dbTransaction);
var content = Serialize(contentObj); var content = Serialize(contentObj, callbackName);
PublishWithTrans(name, content, dbConnection, dbTransaction); PublishWithTrans(name, content);
} }
public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection,
string callbackName = null, IDbTransaction dbTransaction = null)
{ {
CheckIsAdoNet(name); CheckIsAdoNet(name);
PrepareConnectionForAdo(dbConnection, ref dbTransaction); PrepareConnectionForAdo(dbConnection, dbTransaction);
var content = Serialize(contentObj); var content = Serialize(contentObj, callbackName);
return PublishWithTransAsync(name, content, dbConnection, dbTransaction); return PublishWithTransAsync(name, content);
} }
protected abstract void PrepareConnectionForEF(); protected abstract void PrepareConnectionForEF();
...@@ -64,35 +66,29 @@ namespace DotNetCore.CAP.Abstractions ...@@ -64,35 +66,29 @@ namespace DotNetCore.CAP.Abstractions
#region private methods #region private methods
private string Serialize<T>(T obj) private string Serialize<T>(T obj, string callbackName = null)
{ {
string content = string.Empty; var message = new Message(obj)
if (Helper.IsComplexType(typeof(T)))
{ {
content = Helper.ToJson(obj); CallbackName = callbackName
} };
else
{ return Helper.ToJson(message);
content = obj.ToString();
}
return content;
} }
private void PrepareConnectionForAdo(IDbConnection dbConnection, ref IDbTransaction dbTransaction) private void PrepareConnectionForAdo(IDbConnection dbConnection, IDbTransaction dbTransaction)
{ {
if (dbConnection == null) DbConnection = dbConnection ?? throw new ArgumentNullException(nameof(dbConnection));
throw new ArgumentNullException(nameof(dbConnection)); if (DbConnection.State != ConnectionState.Open)
if (dbConnection.State != ConnectionState.Open)
{ {
IsCapOpenedConn = true; IsCapOpenedConn = true;
dbConnection.Open(); DbConnection.Open();
} }
DbTranasaction = dbTransaction;
if (dbTransaction == null) if (DbTranasaction == null)
{ {
IsCapOpenedTrans = true; IsCapOpenedTrans = true;
dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); DbTranasaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
} }
} }
...@@ -111,7 +107,7 @@ namespace DotNetCore.CAP.Abstractions ...@@ -111,7 +107,7 @@ namespace DotNetCore.CAP.Abstractions
throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
} }
private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) private async Task PublishWithTransAsync(string name, string content)
{ {
var message = new CapPublishedMessage var message = new CapPublishedMessage
{ {
...@@ -120,23 +116,14 @@ namespace DotNetCore.CAP.Abstractions ...@@ -120,23 +116,14 @@ namespace DotNetCore.CAP.Abstractions
StatusName = StatusName.Scheduled StatusName = StatusName.Scheduled
}; };
await ExecuteAsync(dbConnection, dbTransaction, message); await ExecuteAsync(DbConnection, DbTranasaction, message);
if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
}
if (IsCapOpenedConn) ClosedCap();
{
dbConnection.Dispose();
}
PublishQueuer.PulseEvent.Set(); PublishQueuer.PulseEvent.Set();
} }
private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) private void PublishWithTrans(string name, string content)
{ {
var message = new CapPublishedMessage var message = new CapPublishedMessage
{ {
...@@ -145,19 +132,30 @@ namespace DotNetCore.CAP.Abstractions ...@@ -145,19 +132,30 @@ namespace DotNetCore.CAP.Abstractions
StatusName = StatusName.Scheduled StatusName = StatusName.Scheduled
}; };
Execute(dbConnection, dbTransaction, message); Execute(DbConnection, DbTranasaction, message);
ClosedCap();
PublishQueuer.PulseEvent.Set();
}
private void ClosedCap()
{
if (IsCapOpenedTrans) if (IsCapOpenedTrans)
{ {
dbTransaction.Commit(); DbTranasaction.Commit();
dbTransaction.Dispose(); DbTranasaction.Dispose();
} }
if (IsCapOpenedConn) if (IsCapOpenedConn)
{ {
dbConnection.Dispose(); DbConnection.Dispose();
} }
}
PublishQueuer.PulseEvent.Set(); public void Dispose()
{
DbTranasaction?.Dispose();
DbConnection?.Dispose();
} }
#endregion private methods #endregion private methods
......
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace DotNetCore.CAP
{
public interface ICallbackPublisher
{
Task PublishAsync(string name, object obj);
}
}
...@@ -18,7 +18,8 @@ namespace DotNetCore.CAP ...@@ -18,7 +18,8 @@ namespace DotNetCore.CAP
/// <typeparam name="T">The type of conetent object.</typeparam> /// <typeparam name="T">The type of conetent object.</typeparam>
/// <param name="name">the topic name or exchange router key.</param> /// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param> /// <param name="contentObj">message body content, that will be serialized of json.</param>
Task PublishAsync<T>(string name, T contentObj); /// <param name="callbackName">callback subscriber name</param>
Task PublishAsync<T>(string name, T contentObj, string callbackName = null);
/// <summary> /// <summary>
/// (EntityFramework) Publish a object message. /// (EntityFramework) Publish a object message.
...@@ -30,24 +31,27 @@ namespace DotNetCore.CAP ...@@ -30,24 +31,27 @@ namespace DotNetCore.CAP
/// <typeparam name="T">The type of conetent object.</typeparam> /// <typeparam name="T">The type of conetent object.</typeparam>
/// <param name="name">the topic name or exchange router key.</param> /// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param> /// <param name="contentObj">message body content, that will be serialized of json.</param>
void Publish<T>(string name, T contentObj); /// <param name="callbackName">callback subscriber name</param>
void Publish<T>(string name, T contentObj, string callbackName = null);
/// <summary> /// <summary>
/// (ado.net) Asynchronous publish a object message. /// (ado.net) Asynchronous publish a object message.
/// </summary> /// </summary>
/// <param name="name">the topic name or exchange router key.</param> /// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param> /// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="callbackName">callback subscriber name</param>
/// <param name="dbConnection">the connection of <see cref="IDbConnection"/></param> /// <param name="dbConnection">the connection of <see cref="IDbConnection"/></param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction"/></param> /// <param name="dbTransaction">the transaction of <see cref="IDbTransaction"/></param>
Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null); Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, string callbackName = null, IDbTransaction dbTransaction = null);
/// <summary> /// <summary>
/// (ado.net) Publish a object message. /// (ado.net) Publish a object message.
/// </summary> /// </summary>
/// <param name="name">the topic name or exchange router key.</param> /// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param> /// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="callbackName">callback subscriber name</param>
/// <param name="dbConnection">the connection of <see cref="IDbConnection"/></param> /// <param name="dbConnection">the connection of <see cref="IDbConnection"/></param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction"/></param> /// <param name="dbTransaction">the transaction of <see cref="IDbTransaction"/></param>
void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null); void Publish<T>(string name, T contentObj, IDbConnection dbConnection, string callbackName = null, IDbTransaction dbTransaction = null);
} }
} }
\ No newline at end of file
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
...@@ -37,31 +39,47 @@ namespace DotNetCore.CAP.Internal ...@@ -37,31 +39,47 @@ namespace DotNetCore.CAP.Internal
var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider,
_consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType()); _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType());
var value = _consumerContext.DeliverMessage.Content; var jsonConent = _consumerContext.DeliverMessage.Content;
var message = Helper.FromJson<Message>(jsonConent);
object returnObj = null;
if (_executor.MethodParameters.Length > 0) if (_executor.MethodParameters.Length > 0)
{ {
var firstParameter = _executor.MethodParameters[0]; var firstParameter = _executor.MethodParameters[0];
try try
{ {
var binder = _modelBinderFactory.CreateBinder(firstParameter); var binder = _modelBinderFactory.CreateBinder(firstParameter);
var result = await binder.BindModelAsync(value); var result = await binder.BindModelAsync(message.Content.ToString());
if (result.IsSuccess) if (result.IsSuccess)
{ {
_executor.Execute(obj, result.Model); returnObj = _executor.Execute(obj, result.Model);
} }
else else
{ {
_logger.LogWarning($"Parameters:{firstParameter.Name} bind failed! the content is:" + value); _logger.LogWarning($"Parameters:{firstParameter.Name} bind failed! the content is:" + jsonConent);
} }
} }
catch (FormatException ex) catch (FormatException ex)
{ {
_logger.ModelBinderFormattingException(_executor.MethodInfo?.Name, firstParameter.Name, value, ex); _logger.ModelBinderFormattingException(_executor.MethodInfo?.Name, firstParameter.Name, jsonConent, ex);
} }
} }
else else
{ {
_executor.Execute(obj); returnObj = _executor.Execute(obj);
}
//TODO :refactor
if (returnObj != null && !string.IsNullOrEmpty(message.CallbackName))
{
var publisher = _serviceProvider.GetRequiredService<ICallbackPublisher>();
var callbackMessage = new Message(returnObj)
{
Id = message.Id,
Timestamp = DateTime.Now
};
await publisher.PublishAsync(message.CallbackName, callbackMessage);
} }
} }
} }
......
...@@ -3,6 +3,7 @@ using System.Reflection; ...@@ -3,6 +3,7 @@ using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions.ModelBinding; using DotNetCore.CAP.Abstractions.ModelBinding;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP.Internal namespace DotNetCore.CAP.Internal
{ {
...@@ -20,7 +21,11 @@ namespace DotNetCore.CAP.Internal ...@@ -20,7 +21,11 @@ namespace DotNetCore.CAP.Internal
try try
{ {
var type = _parameterInfo.ParameterType; var type = _parameterInfo.ParameterType;
var value = Helper.FromJson(content, type);
var message = Helper.FromJson<Message>(content);
var value = Helper.FromJson(message.Content.ToString(), type);
return Task.FromResult(ModelBindingResult.Success(value)); return Task.FromResult(ModelBindingResult.Success(value));
} }
catch (Exception) catch (Exception)
......
using System;
namespace DotNetCore.CAP.Models
{
public class Message
{
public string Id { get; set; }
public DateTime Timestamp { get; set; }
public object Content { get; set; }
public string CallbackName { get; set; }
public Message()
{
Id = ObjectId.GenerateNewStringId();
Timestamp = DateTime.Now;
}
public Message(object content) : this()
{
Content = content;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment