Commit 9b4f3d14 authored by Savorboard's avatar Savorboard

resolve conflict

parents 544325a7 ccd72ce2
......@@ -68,6 +68,7 @@ Task("Pack")
{
Configuration = build.Configuration,
VersionSuffix = build.Version.Suffix,
IncludeSymbols = true,
OutputDirectory = "./artifacts/packages"
};
foreach (var project in build.ProjectFiles)
......
......@@ -2,7 +2,7 @@
<PropertyGroup>
<VersionMajor>2</VersionMajor>
<VersionMinor>1</VersionMinor>
<VersionPatch>0</VersionPatch>
<VersionPatch>3</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup>
......
......@@ -8,7 +8,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.0" />
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.5" />
</ItemGroup>
<ItemGroup>
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" />
......
......@@ -10,8 +10,8 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.0" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="2.0.0" />
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.5" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="2.0.1" />
</ItemGroup>
<ItemGroup>
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" />
......
......@@ -5,8 +5,8 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.0" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="2.0.0" />
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.5" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="2.0.1" />
</ItemGroup>
<ItemGroup>
......
......@@ -6,7 +6,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.0" />
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.5" />
</ItemGroup>
<ItemGroup>
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="2.0.0" />
......
......@@ -8,13 +8,14 @@
<PackageTags>$(PackageTags);Kafka</PackageTags>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<PropertyGroup>
<WarningsAsErrors>NU1605;NU1701</WarningsAsErrors>
<NoWarn>NU1701</NoWarn>
<NoWarn>NU1701;CS1591</NoWarn>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.Kafka.xml</DocumentationFile>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="0.11.2" />
<PackageReference Include="Confluent.Kafka" Version="0.11.3" />
</ItemGroup>
<ItemGroup>
......
......@@ -24,7 +24,7 @@ namespace DotNetCore.CAP.Kafka
public event EventHandler<MessageContext> OnMessageReceived;
public event EventHandler<string> OnError;
public event EventHandler<LogMessageEventArgs> OnLog;
public void Subscribe(IEnumerable<string> topics)
{
......@@ -34,7 +34,6 @@ namespace DotNetCore.CAP.Kafka
if (_consumerClient == null)
InitKafkaClient();
//_consumerClient.Assign(topics.Select(x=> new TopicPartition(x, 0)));
_consumerClient.Subscribe(topics);
}
......@@ -55,7 +54,7 @@ namespace DotNetCore.CAP.Kafka
public void Reject()
{
// Ignore, Kafka will not commit offset when not commit.
_consumerClient.Assign(_consumerClient.Assignment);
}
public void Dispose()
......@@ -76,12 +75,17 @@ namespace DotNetCore.CAP.Kafka
_consumerClient.OnError += ConsumerClient_OnError;
}
private void ConsumerClient_OnConsumeError(object sender, Message e)
{
var message = e.Deserialize<Null, string>(null, StringDeserializer);
OnError?.Invoke(sender, $"An error occurred during consume the message; Topic:'{e.Topic}'," +
$"Message:'{message.Value}', Reason:'{e.Error}'.");
var logArgs = new LogMessageEventArgs
{
LogType = MqLogType.ConsumeError,
Reason = $"An error occurred during consume the message; Topic:'{e.Topic}'," +
$"Message:'{message.Value}', Reason:'{e.Error}'."
};
OnLog?.Invoke(sender, logArgs);
}
private void ConsumerClient_OnMessage(object sender, Message<Null, string> e)
......@@ -98,7 +102,12 @@ namespace DotNetCore.CAP.Kafka
private void ConsumerClient_OnError(object sender, Error e)
{
OnError?.Invoke(sender, e.ToString());
var logArgs = new LogMessageEventArgs
{
LogType = MqLogType.ServerConnError,
Reason = e.ToString()
};
OnLog?.Invoke(sender, logArgs);
}
#endregion private methods
......
......@@ -62,14 +62,12 @@ namespace DotNetCore.CAP.MySql
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
}
protected override Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
return Task.CompletedTask;
}
#region private methods
......
......@@ -8,11 +8,16 @@
<PackageTags>$(PackageTags);MySQL</PackageTags>
</PropertyGroup>
<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.MySql.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.0" />
<PackageReference Include="MySqlConnector" Version="0.28.2" />
<PackageReference Include="Dapper" Version="1.50.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.1" />
<PackageReference Include="MySqlConnector" Version="0.34.2" />
</ItemGroup>
<ItemGroup>
......
using System;
using System.Data;
using System.Threading;
using Dapper;
using Dapper;
using DotNetCore.CAP.Models;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
public class MySqlFetchedMessage : IFetchedMessage
{
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
private readonly IDbConnection _connection;
private readonly object _lockObject = new object();
private readonly Timer _timer;
private readonly IDbTransaction _transaction;
private readonly MySqlOptions _options;
private readonly string _processId;
public MySqlFetchedMessage(int messageId,
MessageType type,
IDbConnection connection,
IDbTransaction transaction)
public MySqlFetchedMessage(int messageId, MessageType type, string processId, MySqlOptions options)
{
MessageId = messageId;
MessageType = type;
_connection = connection;
_transaction = transaction;
_timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval);
_processId = processId;
_options = options;
}
public int MessageId { get; }
......@@ -32,43 +24,25 @@ namespace DotNetCore.CAP.MySql
public void RemoveFromQueue()
{
lock (_lockObject)
using (var connection = new MySqlConnection(_options.ConnectionString))
{
_transaction.Commit();
}
connection.Execute($"DELETE FROM `{_options.TableNamePrefix}.queue` WHERE `ProcessId`=@ProcessId"
, new { ProcessId = _processId });
}
}
public void Requeue()
{
lock (_lockObject)
using (var connection = new MySqlConnection(_options.ConnectionString))
{
_transaction.Rollback();
connection.Execute($"UPDATE `{_options.TableNamePrefix}.queue` SET `ProcessId`=NULL WHERE `ProcessId`=@ProcessId"
, new { ProcessId = _processId });
}
}
public void Dispose()
{
lock (_lockObject)
{
_timer?.Dispose();
_transaction.Dispose();
_connection.Dispose();
}
}
private void ExecuteKeepAliveQuery(object obj)
{
lock (_lockObject)
{
try
{
_connection?.Execute("SELECT 1", _transaction);
}
catch
{
// ignored
}
}
// ignored
}
}
}
\ No newline at end of file
......@@ -53,7 +53,8 @@ namespace DotNetCore.CAP.MySql
$@"
CREATE TABLE IF NOT EXISTS `{prefix}.queue` (
`MessageId` int(11) NOT NULL,
`MessageType` tinyint(4) NOT NULL
`MessageType` tinyint(4) NOT NULL,
`ProcessId` varchar(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE IF NOT EXISTS `{prefix}.received` (
......@@ -62,8 +63,8 @@ CREATE TABLE IF NOT EXISTS `{prefix}.received` (
`Group` varchar(200) DEFAULT NULL,
`Content` longtext,
`Retries` int(11) DEFAULT NULL,
`Added` datetime(6) NOT NULL,
`ExpiresAt` datetime(6) DEFAULT NULL,
`Added` datetime NOT NULL,
`ExpiresAt` datetime DEFAULT NULL,
`StatusName` varchar(50) NOT NULL,
PRIMARY KEY (`Id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
......@@ -73,8 +74,8 @@ CREATE TABLE IF NOT EXISTS `{prefix}.published` (
`Name` varchar(200) NOT NULL,
`Content` longtext,
`Retries` int(11) DEFAULT NULL,
`Added` datetime(6) NOT NULL,
`ExpiresAt` datetime(6) DEFAULT NULL,
`Added` datetime NOT NULL,
`ExpiresAt` datetime DEFAULT NULL,
`StatusName` varchar(40) NOT NULL,
PRIMARY KEY (`Id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
......
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
......@@ -42,14 +41,12 @@ namespace DotNetCore.CAP.MySql
public Task<IFetchedMessage> FetchNextMessageAsync()
{
var processId = ObjectId.GenerateNewStringId();
var sql = $@"
SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE;
DELETE FROM `{_prefix}.queue` LIMIT 1;";
// var sql = $@"
//SELECT @MId:=`MessageId` as MessageId, @MType:=`MessageType` as MessageType FROM `{_prefix}.queue` LIMIT 1;
//DELETE FROM `{_prefix}.queue` where `MessageId` = @MId AND `MessageType`=@MType;";
UPDATE `{_prefix}.queue` SET `ProcessId`=@ProcessId WHERE `ProcessId` IS NULL LIMIT 1;
SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` WHERE `ProcessId`=@ProcessId;";
return FetchNextMessageCoreAsync(sql);
return FetchNextMessageCoreAsync(sql, processId);
}
public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
......@@ -60,6 +57,7 @@ SELECT * FROM `{_prefix}.published` WHERE Id=LAST_INSERT_ID();";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
connection.Execute("SELECT LAST_INSERT_ID(0)");
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
}
......@@ -105,6 +103,7 @@ SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
connection.Execute("SELECT LAST_INSERT_ID(0)");
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}
......@@ -118,15 +117,10 @@ SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";
}
}
public void Dispose()
{
}
public bool ChangePublishedState(int messageId, string state)
{
var sql =
$"UPDATE `{_prefix}.published` SET `Retries`=`Retries`+1,`StatusName` = '{state}' WHERE `Id`={messageId}";
$"UPDATE `{_prefix}.published` SET `Retries`=`Retries`+1,`ExpiresAt`=NULL,`StatusName` = '{state}' WHERE `Id`={messageId}";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
......@@ -137,7 +131,7 @@ SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";
public bool ChangeReceivedState(int messageId, string state)
{
var sql =
$"UPDATE `{_prefix}.received` SET `Retries`=`Retries`+1,`StatusName` = '{state}' WHERE `Id`={messageId}";
$"UPDATE `{_prefix}.received` SET `Retries`=`Retries`+1,`ExpiresAt`=NULL,`StatusName` = '{state}' WHERE `Id`={messageId}";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
......@@ -145,45 +139,22 @@ SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";
}
}
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, string processId)
{
//here don't use `using` to dispose
var connection = new MySqlConnection(Options.ConnectionString);
await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedMessage fetchedMessage = null;
try
{
//fetchedMessage = await connection.QuerySingleOrDefaultAsync<FetchedMessage>(sql, args, transaction);
// An anomaly with unknown causes, sometimes QuerySingleOrDefaultAsync can't return expected result.
using (var reader = connection.ExecuteReader(sql, args, transaction))
{
while (reader.Read())
{
fetchedMessage = new FetchedMessage
{
MessageId = (int)reader.GetInt64(0),
MessageType = (MessageType)reader.GetInt64(1)
};
}
}
}
catch (MySqlException)
FetchedMessage fetchedMessage;
using (var connection = new MySqlConnection(Options.ConnectionString))
{
transaction.Dispose();
throw;
fetchedMessage = await connection.QuerySingleOrDefaultAsync<FetchedMessage>(sql, new { ProcessId = processId });
}
if (fetchedMessage == null)
{
transaction.Rollback();
transaction.Dispose();
connection.Dispose();
return null;
}
return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection,
transaction);
return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, processId, Options);
}
public void Dispose()
{
}
}
}
\ No newline at end of file
......@@ -46,7 +46,7 @@ namespace DotNetCore.CAP.MySql
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO `{_prefix}.queue` values(@MessageId,@MessageType);";
var sql = $"INSERT INTO `{_prefix}.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
_dbTransaction);
}
......@@ -55,7 +55,7 @@ namespace DotNetCore.CAP.MySql
{
if (message == null) throw new ArgumentNullException(nameof(message));
var sql = $"INSERT INTO `{_prefix}.queue` values(@MessageId,@MessageType);";
var sql = $"INSERT INTO `{_prefix}.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction);
}
......
......@@ -64,14 +64,12 @@ namespace DotNetCore.CAP.PostgreSql
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
}
protected override Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
return Task.CompletedTask;
}
#region private methods
......
......@@ -8,11 +8,16 @@
<PackageTags>$(PackageTags);PostgreSQL</PackageTags>
</PropertyGroup>
<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.PostgreSql.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.0" />
<PackageReference Include="Npgsql" Version="3.2.5" />
<PackageReference Include="Dapper" Version="1.50.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.1" />
<PackageReference Include="Npgsql" Version="3.2.6" />
</ItemGroup>
<ItemGroup>
......
......@@ -113,7 +113,7 @@ namespace DotNetCore.CAP.PostgreSql
public bool ChangePublishedState(int messageId, string state)
{
var sql =
$"UPDATE \"{Options.Schema}\".\"published\" SET \"Retries\"=\"Retries\"+1,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}";
$"UPDATE \"{Options.Schema}\".\"published\" SET \"Retries\"=\"Retries\"+1,\"ExpiresAt\"=NULL,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
......@@ -124,7 +124,7 @@ namespace DotNetCore.CAP.PostgreSql
public bool ChangeReceivedState(int messageId, string state)
{
var sql =
$"UPDATE \"{Options.Schema}\".\"received\" SET \"Retries\"=\"Retries\"+1,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}";
$"UPDATE \"{Options.Schema}\".\"received\" SET \"Retries\"=\"Retries\"+1,\"ExpiresAt\"=NULL,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
......@@ -146,6 +146,7 @@ namespace DotNetCore.CAP.PostgreSql
catch (NpgsqlException)
{
transaction.Dispose();
connection.Dispose();
throw;
}
......
......@@ -28,9 +28,9 @@ namespace DotNetCore.CAP
public const string DefaultVHost = "/";
/// <summary>
/// Default exchange name (value: "cap.default.topic").
/// Default exchange name (value: "cap.default.router").
/// </summary>
public const string DefaultExchangeName = "cap.default.topic";
public const string DefaultExchangeName = "cap.default.router";
/// <summary> The topic exchange type. </summary>
public const string ExchangeType = "topic";
......
......@@ -7,6 +7,11 @@
<AssemblyName>DotNetCore.CAP.RabbitMQ</AssemblyName>
<PackageTags>$(PackageTags);RabbitMQ</PackageTags>
</PropertyGroup>
<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.RabbitMQ.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="5.0.1" />
......
......@@ -2,7 +2,6 @@
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
......@@ -15,6 +14,7 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly string _queueName;
private readonly RabbitMQOptions _rabbitMQOptions;
private IConnection _connection;
private IModel _channel;
private ulong _deliveryTag;
......@@ -32,7 +32,7 @@ namespace DotNetCore.CAP.RabbitMQ
public event EventHandler<MessageContext> OnMessageReceived;
public event EventHandler<string> OnError;
public event EventHandler<LogMessageEventArgs> OnLog;
public void Subscribe(IEnumerable<string> topics)
{
......@@ -47,9 +47,18 @@ namespace DotNetCore.CAP.RabbitMQ
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += OnConsumerReceived;
consumer.Shutdown += OnConsumerShutdown;
consumer.Registered += OnConsumerRegistered;
consumer.Unregistered += OnConsumerUnregistered;
consumer.ConsumerCancelled += OnConsumerConsumerCancelled;
_channel.BasicConsume(_queueName, false, consumer);
while (true)
Task.Delay(timeout, cancellationToken).GetAwaiter().GetResult();
{
cancellationToken.ThrowIfCancellationRequested();
cancellationToken.WaitHandle.WaitOne(timeout);
}
// ReSharper disable once FunctionNeverReturns
}
public void Commit()
......@@ -65,13 +74,14 @@ namespace DotNetCore.CAP.RabbitMQ
public void Dispose()
{
_channel.Dispose();
_connection.Dispose();
}
private void InitClient()
{
var connection = _connectionChannelPool.GetConnection();
_connection = _connectionChannelPool.GetConnection();
_channel = connection.CreateModel();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(
_exchageName,
......@@ -84,6 +94,38 @@ namespace DotNetCore.CAP.RabbitMQ
_channel.QueueDeclare(_queueName, true, false, false, arguments);
}
#region events
private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e)
{
var args = new LogMessageEventArgs
{
LogType = MqLogType.ConsumerCancelled,
Reason = e.ConsumerTag
};
OnLog?.Invoke(sender, args);
}
private void OnConsumerUnregistered(object sender, ConsumerEventArgs e)
{
var args = new LogMessageEventArgs
{
LogType = MqLogType.ConsumerUnregistered,
Reason = e.ConsumerTag
};
OnLog?.Invoke(sender, args);
}
private void OnConsumerRegistered(object sender, ConsumerEventArgs e)
{
var args = new LogMessageEventArgs
{
LogType = MqLogType.ConsumerRegistered,
Reason = e.ConsumerTag
};
OnLog?.Invoke(sender, args);
}
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e)
{
_deliveryTag = e.DeliveryTag;
......@@ -98,7 +140,14 @@ namespace DotNetCore.CAP.RabbitMQ
private void OnConsumerShutdown(object sender, ShutdownEventArgs e)
{
OnError?.Invoke(sender, e.Cause?.ToString());
var args = new LogMessageEventArgs
{
LogType = MqLogType.ConsumerShutdown,
Reason = e.ToString()
};
OnLog?.Invoke(sender, args);
}
#endregion
}
}
\ No newline at end of file
......@@ -60,17 +60,15 @@ namespace DotNetCore.CAP.SqlServer
{
dbConnection.Execute(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
_logger.LogInformation("published message has been persisted to the database. name:" + message);
}
protected override Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
return Task.CompletedTask;
_logger.LogInformation("published message has been persisted to the database. name:" + message);
}
#region private methods
......
......@@ -8,11 +8,16 @@
<PackageTags>$(PackageTags);SQL Server</PackageTags>
</PropertyGroup>
<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.SqlServer.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.0" />
<PackageReference Include="System.Data.SqlClient" Version="4.4.0" />
<PackageReference Include="Dapper" Version="1.50.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.1" />
<PackageReference Include="System.Data.SqlClient" Version="4.4.2" />
</ItemGroup>
<ItemGroup>
......
......@@ -68,19 +68,6 @@ OUTPUT DELETED.MessageId,DELETED.[MessageType];";
}
}
public bool ChangePublishedState(int messageId, string state)
{
var sql =
$"UPDATE [{Options.Schema}].[Published] SET Retries=Retries+1,StatusName = '{state}' WHERE Id={messageId}";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return connection.Execute(sql) > 0;
}
}
// CapReceivedMessage
public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
......@@ -124,10 +111,21 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
}
public bool ChangePublishedState(int messageId, string state)
{
var sql =
$"UPDATE [{Options.Schema}].[Published] SET Retries=Retries+1,ExpiresAt=NULL,StatusName = '{state}' WHERE Id={messageId}";
using (var connection = new SqlConnection(Options.ConnectionString))
{
return connection.Execute(sql) > 0;
}
}
public bool ChangeReceivedState(int messageId, string state)
{
var sql =
$"UPDATE [{Options.Schema}].[Received] SET Retries=Retries+1,StatusName = '{state}' WHERE Id={messageId}";
$"UPDATE [{Options.Schema}].[Received] SET Retries=Retries+1,ExpiresAt=NULL,StatusName = '{state}' WHERE Id={messageId}";
using (var connection = new SqlConnection(Options.ConnectionString))
{
......@@ -153,6 +151,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
catch (SqlException)
{
transaction.Dispose();
connection.Dispose();
throw;
}
......
......@@ -123,7 +123,7 @@ namespace DotNetCore.CAP.Abstractions
"If you are using the EntityFramework, you do not need to use this overloaded.");
}
private Task PublishWithTransAsync(string name, string content)
private async Task PublishWithTransAsync(string name, string content)
{
var message = new CapPublishedMessage
{
......@@ -132,13 +132,11 @@ namespace DotNetCore.CAP.Abstractions
StatusName = StatusName.Scheduled
};
ExecuteAsync(DbConnection, DbTransaction, message);
await ExecuteAsync(DbConnection, DbTransaction, message);
ClosedCap();
PublishQueuer.PulseEvent.Set();
return Task.CompletedTask;
}
private void PublishWithTrans(string name, string content)
......
......@@ -20,9 +20,10 @@ namespace DotNetCore.CAP.Abstractions
public string Name { get; }
/// <summary>
/// Default group name is CapOptions setting.(Assembly name)
/// kafka --> groups.id
/// rabbit MQ --> queue.name
/// </summary>
public string Group { get; set; } = "cap.default.group";
public string Group { get; set; }
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Reflection;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP
......@@ -34,6 +35,7 @@ namespace DotNetCore.CAP
/// </summary>
public const int DefaultFailedRetryCount = 100;
public CapOptions()
{
PollingDelay = DefaultPollingDelay;
......@@ -42,10 +44,16 @@ namespace DotNetCore.CAP
FailedRetryInterval = DefaultFailedMessageWaitingInterval;
FailedRetryCount = DefaultFailedRetryCount;
Extensions = new List<ICapOptionsExtension>();
DefaultGroup = "cap.queue." + Assembly.GetEntryAssembly().GetName().Name.ToLower();
}
internal IList<ICapOptionsExtension> Extensions { get; }
/// <summary>
/// Subscriber default group name. kafka-->group name. rabbitmq --> queue name.
/// </summary>
public string DefaultGroup { get; set; }
/// <summary>
/// Producer job polling delay time.
/// Default is 15 sec.
......
......@@ -371,7 +371,7 @@
receivedSucceeded,
receivedFailed,
receivedSucceededStr,
receivedFailedStr,
receivedFailedStr
);
$(window).resize(function() {
......
......@@ -5,6 +5,10 @@
<AssemblyName>DotNetCore.CAP</AssemblyName>
<PackageTags>$(PackageTags);</PackageTags>
</PropertyGroup>
<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>
<ItemGroup>
<None Remove="Dashboard\Content\css\bootstrap.min.css" />
<None Remove="Dashboard\Content\css\jsonview.min.css" />
......
......@@ -33,6 +33,6 @@ namespace DotNetCore.CAP
event EventHandler<MessageContext> OnMessageReceived;
event EventHandler<string> OnError;
event EventHandler<LogMessageEventArgs> OnLog;
}
}
\ No newline at end of file
......@@ -23,6 +23,7 @@ namespace DotNetCore.CAP
private readonly IServiceProvider _serviceProvider;
private Task _compositeTask;
private bool _disposed;
public ConsumerHandler(
......@@ -44,17 +45,18 @@ namespace DotNetCore.CAP
foreach (var matchGroup in groupingMatches)
Task.Factory.StartNew(() =>
{
using (var client = _consumerClientFactory.Create(matchGroup.Key))
{
RegisterMessageProcessor(client);
{
using (var client = _consumerClientFactory.Create(matchGroup.Key))
{
RegisterMessageProcessor(client);
client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name));
client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name));
client.Listening(_pollingDelay, _cts.Token);
}
}, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
_compositeTask = Task.CompletedTask;
client.Listening(_pollingDelay, _cts.Token);
}
}, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
_compositeTask = Task.CompletedTask;
}
public void Dispose()
......@@ -62,13 +64,10 @@ namespace DotNetCore.CAP
if (_disposed)
return;
_disposed = true;
_logger.ServerShuttingDown();
_cts.Cancel();
try
{
_compositeTask.Wait(TimeSpan.FromSeconds(10));
_compositeTask.Wait(TimeSpan.FromSeconds(2));
}
catch (AggregateException ex)
{
......@@ -105,7 +104,34 @@ namespace DotNetCore.CAP
Pulse();
};
client.OnError += (sender, reason) => { _logger.MessageQueueError(reason); };
client.OnLog += WriteLog;
}
private void WriteLog(object sender, LogMessageEventArgs logmsg)
{
switch (logmsg.LogType)
{
case MqLogType.ConsumerCancelled:
_logger.LogWarning("RabbitMQ consumer cancelled. reason: " + logmsg.Reason);
break;
case MqLogType.ConsumerRegistered:
_logger.LogInformation("RabbitMQ consumer registered. " + logmsg.Reason);
break;
case MqLogType.ConsumerUnregistered:
_logger.LogWarning("RabbitMQ consumer unregistered. reason: " + logmsg.Reason);
break;
case MqLogType.ConsumerShutdown:
_logger.LogWarning("RabbitMQ consumer shutdown. reason:" + logmsg.Reason);
break;
case MqLogType.ConsumeError:
_logger.LogError("Kakfa client consume error. reason:" + logmsg.Reason);
break;
case MqLogType.ServerConnError:
_logger.LogCritical("Kafka server connection error. reason:" + logmsg.Reason);
break;
default:
throw new ArgumentOutOfRangeException();
}
}
private static void StoreMessage(IServiceScope serviceScope, MessageContext messageContext)
......
......@@ -76,7 +76,7 @@ namespace DotNetCore.CAP
}
catch (Exception ex)
{
_logger.ExceptionOccuredWhileExecuting(message?.Name, ex);
_logger.ExceptionOccuredWhileExecuting(message.Name, ex);
fetched.Requeue();
......@@ -89,7 +89,7 @@ namespace DotNetCore.CAP
IState newState;
if (!result.Succeeded)
{
var shouldRetry = UpdateMessageForRetryAsync(message);
var shouldRetry = UpdateMessageForRetry(message);
if (shouldRetry)
{
newState = new ScheduledState();
......@@ -109,7 +109,7 @@ namespace DotNetCore.CAP
return newState;
}
private static bool UpdateMessageForRetryAsync(CapReceivedMessage message)
private static bool UpdateMessageForRetry(CapReceivedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;
......
......@@ -10,23 +10,26 @@ namespace DotNetCore.CAP.Internal
{
/// <inheritdoc />
/// <summary>
/// A default <see cref="T:DotNetCore.CAP.Abstractions.IConsumerServiceSelector" /> implementation.
/// A default <see cref="T:DotNetCore.CAP.Abstractions.IConsumerServiceSelector" /> implementation.
/// </summary>
internal class DefaultConsumerServiceSelector : IConsumerServiceSelector
{
private readonly CapOptions _capOptions;
private readonly IServiceProvider _serviceProvider;
/// <summary>
/// Creates a new <see cref="DefaultConsumerServiceSelector" />.
/// Creates a new <see cref="DefaultConsumerServiceSelector" />.
/// </summary>
public DefaultConsumerServiceSelector(IServiceProvider serviceProvider)
public DefaultConsumerServiceSelector(IServiceProvider serviceProvider, CapOptions capOptions)
{
_serviceProvider = serviceProvider;
_capOptions = capOptions;
}
/// <summary>
/// Selects the best <see cref="ConsumerExecutorDescriptor" /> candidate from <paramref name="executeDescriptor" /> for the
/// current message associated.
/// Selects the best <see cref="ConsumerExecutorDescriptor" /> candidate from <paramref name="executeDescriptor" /> for
/// the
/// current message associated.
/// </summary>
public ConsumerExecutorDescriptor SelectBestCandidate(string key,
IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
......@@ -45,7 +48,7 @@ namespace DotNetCore.CAP.Internal
return executorDescriptorList;
}
private static IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes(
private IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes(
IServiceProvider provider)
{
var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
......@@ -66,7 +69,7 @@ namespace DotNetCore.CAP.Internal
}
}
private static IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromControllerTypes()
private IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromControllerTypes()
{
var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
......@@ -81,18 +84,21 @@ namespace DotNetCore.CAP.Internal
return executorDescriptorList;
}
private static IEnumerable<ConsumerExecutorDescriptor> GetTopicAttributesDescription(TypeInfo typeInfo)
private IEnumerable<ConsumerExecutorDescriptor> GetTopicAttributesDescription(TypeInfo typeInfo)
{
foreach (var method in typeInfo.DeclaredMethods)
{
var topicAttr = method.GetCustomAttributes<TopicAttribute>(true);
var topicAttributes = topicAttr as IList<TopicAttribute> ?? topicAttr.ToList();
if (!topicAttributes.Any()) continue;
foreach (var attr in topicAttributes)
{
if (attr.Group == null)
attr.Group = _capOptions.DefaultGroup;
yield return InitDescriptor(attr, method, typeInfo);
}
}
}
......
......@@ -28,22 +28,18 @@ namespace DotNetCore.CAP.Internal
public async Task<OperateResult> ExecuteAsync(CapReceivedMessage receivedMessage)
{
if (!_selector.TryGetTopicExector(receivedMessage.Name, receivedMessage.Group,
out var executor))
{
var error = "message can not be found subscriber. Message:" + receivedMessage;
error += "\r\n see: https://github.com/dotnetcore/CAP/issues/63";
throw new SubscriberNotFoundException(error);
}
var consumerContext = new ConsumerContext(executor, receivedMessage.ToMessageContext());
try
{
var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.Name);
if (!executeDescriptorGroup.ContainsKey(receivedMessage.Group))
{
var error = $"Topic:{receivedMessage.Name}, can not be found subscriber method.";
throw new SubscriberNotFoundException(error);
}
// If there are multiple consumers in the same group, we will take the first
var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0];
var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext());
var ret = await Invoker.InvokeAsync(consumerContext);
if (!string.IsNullOrEmpty(ret.CallbackName))
await _callbackMessageSender.SendAsync(ret.MessageId, ret.CallbackName, ret.Result);
......
......@@ -2,13 +2,13 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using DotNetCore.CAP.Abstractions;
namespace DotNetCore.CAP.Internal
{
internal class MethodMatcherCache
{
private readonly IConsumerServiceSelector _selector;
private List<string> _allTopics;
public MethodMatcherCache(IConsumerServiceSelector selector)
{
......@@ -54,5 +54,50 @@ namespace DotNetCore.CAP.Internal
}
return dic;
}
/// <summary>
/// Attempts to get the topic exector associated with the specified topic name and group name from the <see cref="Entries"/>.
/// </summary>
/// <param name="topicName">The topic name of the value to get.</param>
/// <param name="groupName">The group name of the value to get.</param>
/// <param name="matchTopic">topic exector of the value.</param>
/// <returns>true if the key was found, otherwise false. </returns>
public bool TryGetTopicExector(string topicName, string groupName,
out ConsumerExecutorDescriptor matchTopic)
{
if (Entries == null)
throw new ArgumentNullException(nameof(Entries));
matchTopic = null;
if (Entries.TryGetValue(groupName, out var groupMatchTopics))
{
matchTopic = groupMatchTopics.FirstOrDefault(x => x.Attribute.Name == topicName);
return matchTopic != null;
}
return false;
}
/// <summary>
/// Get all subscribe topics name.
/// </summary>
public IEnumerable<string> GetSubscribeTopics()
{
if (_allTopics != null)
{
return _allTopics;
}
if (Entries == null)
throw new ArgumentNullException(nameof(Entries));
_allTopics = new List<string>();
foreach (var descriptors in Entries.Values)
{
_allTopics.AddRange(descriptors.Select(x => x.Attribute.Name));
}
return _allTopics;
}
}
}
\ No newline at end of file
......@@ -37,7 +37,7 @@ namespace DotNetCore.CAP
"Starting the processors throw an exception.");
_serverShuttingDown = LoggerMessage.Define(
LogLevel.Debug,
LogLevel.Information,
2,
"Shutting down the processing server...");
......
......@@ -47,7 +47,7 @@ namespace DotNetCore.CAP.Models
public override string ToString()
{
return "name:" + Name + ", content:" + Content;
return "name:" + Name + ", group:" + Group + ", content:" + Content;
}
}
}
\ No newline at end of file
using System;
namespace DotNetCore.CAP
{
public enum MqLogType
{
//RabbitMQ
ConsumerCancelled,
ConsumerRegistered,
ConsumerUnregistered,
ConsumerShutdown,
//Kafka
ConsumeError,
ServerConnError
}
public class LogMessageEventArgs : EventArgs
{
public string Reason { get; set; }
public MqLogType LogType { get; set; }
}
}
......@@ -14,14 +14,14 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.5.0-preview-20170810-02" />
<PackageReference Include="MySqlConnector" Version="0.28.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.0" />
<PackageReference Include="xunit" Version="2.3.0" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.0.0" />
<PackageReference Include="Dapper" Version="1.50.4" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.5.0" />
<PackageReference Include="MySqlConnector" Version="0.34.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
<PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.0.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" />
<PackageReference Include="Moq" Version="4.7.137" />
<PackageReference Include="Moq" Version="4.8.1" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.0.0" />
......
......@@ -7,11 +7,11 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.5.0-preview-20170810-02" />
<PackageReference Include="Npgsql" Version="3.2.5" />
<PackageReference Include="xunit" Version="2.3.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.0" />
<PackageReference Include="Dapper" Version="1.50.4" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.5.0" />
<PackageReference Include="Npgsql" Version="3.2.6" />
<PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
</ItemGroup>
<ItemGroup>
......
......@@ -11,14 +11,14 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.5.0-preview-20170810-02" />
<PackageReference Include="System.Data.SqlClient" Version="4.4.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.0" />
<PackageReference Include="xunit" Version="2.3.0" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.0.0" />
<PackageReference Include="Dapper" Version="1.50.4" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.5.0" />
<PackageReference Include="System.Data.SqlClient" Version="4.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
<PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.0.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" />
<PackageReference Include="Moq" Version="4.7.137" />
<PackageReference Include="Moq" Version="4.8.1" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.0.0" />
......
......@@ -8,13 +8,13 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.5.0-preview-20170810-02" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.5.0" />
<PackageReference Include="System.Data.Common" Version="4.3.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.0" />
<PackageReference Include="xunit" Version="2.3.0" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.0.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
<PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.0.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" />
<PackageReference Include="Moq" Version="4.7.137" />
<PackageReference Include="Moq" Version="4.8.1" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.0" />
</ItemGroup>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment