Commit e3cb23da authored by yangxiaodong's avatar yangxiaodong

refactor publisher base class.

parent c730b18f
...@@ -9,13 +9,10 @@ namespace DotNetCore.CAP.Abstractions ...@@ -9,13 +9,10 @@ namespace DotNetCore.CAP.Abstractions
{ {
public abstract class CapPublisherBase : ICapPublisher public abstract class CapPublisherBase : ICapPublisher
{ {
protected IDbConnection _dbConnection; protected IDbConnection DbConnection { get; set; }
protected IDbTransaction _dbTranasaction; protected IDbTransaction DbTranasaction { get; set; }
protected bool IsCapOpenedTrans { get; set; } protected bool IsCapOpenedTrans { get; set; }
protected bool IsUsingEF { get; set; } protected bool IsUsingEF { get; set; }
protected IServiceProvider ServiceProvider { get; set; } protected IServiceProvider ServiceProvider { get; set; }
public void Publish<T>(string name, T contentObj) public void Publish<T>(string name, T contentObj)
...@@ -25,7 +22,7 @@ namespace DotNetCore.CAP.Abstractions ...@@ -25,7 +22,7 @@ namespace DotNetCore.CAP.Abstractions
var content = Serialize(contentObj); var content = Serialize(contentObj);
PublishWithTrans(name, content, _dbConnection, _dbTranasaction); PublishWithTrans(name, content, DbConnection, DbTranasaction);
} }
public Task PublishAsync<T>(string name, T contentObj) public Task PublishAsync<T>(string name, T contentObj)
...@@ -35,7 +32,7 @@ namespace DotNetCore.CAP.Abstractions ...@@ -35,7 +32,7 @@ namespace DotNetCore.CAP.Abstractions
var content = Serialize(contentObj); var content = Serialize(contentObj);
return PublishWithTransAsync(name, content, _dbConnection, _dbTranasaction); return PublishWithTransAsync(name, content, DbConnection, DbTranasaction);
} }
public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
...@@ -121,6 +118,12 @@ namespace DotNetCore.CAP.Abstractions ...@@ -121,6 +118,12 @@ namespace DotNetCore.CAP.Abstractions
await ExecuteAsync(dbConnection, dbTransaction, message); await ExecuteAsync(dbConnection, dbTransaction, message);
if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
PublishQueuer.PulseEvent.Set(); PublishQueuer.PulseEvent.Set();
} }
...@@ -133,8 +136,14 @@ namespace DotNetCore.CAP.Abstractions ...@@ -133,8 +136,14 @@ namespace DotNetCore.CAP.Abstractions
StatusName = StatusName.Scheduled StatusName = StatusName.Scheduled
}; };
Execute(dbConnection, dbTransaction, message); Execute(dbConnection, dbTransaction, message);
if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
PublishQueuer.PulseEvent.Set(); PublishQueuer.PulseEvent.Set();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment