Commit 28a0c4c2 authored by yangxiaodong's avatar yangxiaodong

Repair the controller cannot read to customer service

parent b4d604ef
...@@ -24,7 +24,7 @@ namespace Sample.Kafka.Controllers ...@@ -24,7 +24,7 @@ namespace Sample.Kafka.Controllers
} }
public string ServerPath => ((IHostingEnvironment)HttpContext.RequestServices.GetService(typeof(IHostingEnvironment))).ContentRootPath; public string ServerPath => ((IHostingEnvironment)HttpContext.RequestServices.GetService(typeof(IHostingEnvironment))).ContentRootPath;
[KafkaTopic("zzwl.topic.finace.callBack", GroupOrExchange = "test")] [KafkaTopic("zzwl.topic.finace.callBack", Group = "test")]
public void KafkaTest(Person person) public void KafkaTest(Person person)
{ {
Console.WriteLine(person.Name); Console.WriteLine(person.Name);
......
...@@ -12,11 +12,7 @@ ...@@ -12,11 +12,7 @@
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute> <GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute> <GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
</PropertyGroup> </PropertyGroup>
<ItemGroup>
<None Include="CAP.KafkaOptions.cs" />
</ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="0.9.5" /> <PackageReference Include="Confluent.Kafka" Version="0.9.5" />
</ItemGroup> </ItemGroup>
......
...@@ -84,64 +84,44 @@ namespace DotNetCore.CAP.Kafka ...@@ -84,64 +84,44 @@ namespace DotNetCore.CAP.Kafka
{ {
var provider = scopedContext.Provider; var provider = scopedContext.Provider;
var messageStore = provider.GetRequiredService<ICapMessageStore>(); var messageStore = provider.GetRequiredService<ICapMessageStore>();
try var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync();
if (message != null)
{ {
var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync(); try
if (message != null)
{ {
var sp = Stopwatch.StartNew(); var sp = Stopwatch.StartNew();
message.StatusName = StatusName.Processing; message.StatusName = StatusName.Processing;
await messageStore.UpdateSentMessageAsync(message); await messageStore.UpdateSentMessageAsync(message);
var jobResult = ExecuteJob(message.KeyName, message.Content); await ExecuteJobAsync(message.KeyName, message.Content);
sp.Stop(); sp.Stop();
if (!jobResult) message.StatusName = StatusName.Succeeded;
{ await messageStore.UpdateSentMessageAsync(message);
_logger.JobFailed(new Exception("topic send failed"));
}
else
{
//TODO : the state will be deleted when release.
message.StatusName = StatusName.Succeeded;
await messageStore.UpdateSentMessageAsync(message);
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
}
}
}
catch (Exception)
{
return false;
}
}
return true;
}
private bool ExecuteJob(string topic, string content) _logger.JobExecuted(sp.Elapsed.TotalSeconds);
{
try
{
var config = _kafkaOptions.AsRdkafkaConfig();
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{
var message = producer.ProduceAsync(topic, null, content).Result;
if (message.Error.Code == ErrorCode.NoError)
{
return true;
} }
else catch (Exception ex)
{ {
_logger.ExceptionOccuredWhileExecutingJob(message.KeyName, ex);
return false; return false;
} }
} }
} }
catch (Exception ex) return true;
}
private Task ExecuteJobAsync(string topic, string content)
{
var config = _kafkaOptions.AsRdkafkaConfig();
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{ {
_logger.ExceptionOccuredWhileExecutingJob(topic, ex); producer.ProduceAsync(topic, null, content);
return false; producer.Flush();
} }
return Task.CompletedTask;
} }
} }
} }
\ No newline at end of file
...@@ -24,10 +24,14 @@ namespace DotNetCore.CAP.Abstractions ...@@ -24,10 +24,14 @@ namespace DotNetCore.CAP.Abstractions
} }
/// <summary> /// <summary>
/// the consumer group. /// kafak --> groups.id
/// rabbitmq --> queue.name
/// </summary> /// </summary>
public string GroupOrExchange { get; set; } public string Group { get; set; } = "cap.default.group";
/// <summary>
/// unused now
/// </summary>
public bool IsOneWay { get; set; } public bool IsOneWay { get; set; }
} }
} }
\ No newline at end of file
...@@ -80,9 +80,9 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -80,9 +80,9 @@ namespace Microsoft.Extensions.DependencyInjection
var types = Assembly.GetEntryAssembly().ExportedTypes; var types = Assembly.GetEntryAssembly().ExportedTypes;
foreach (var type in types) foreach (var type in types)
{ {
if (typeof(IConsumerService).IsAssignableFrom(type)) if (Helper.IsController(type.GetTypeInfo()))
{ {
services.AddSingleton(typeof(IConsumerService), type); services.AddSingleton(typeof(object), type);
} }
} }
} }
......
...@@ -54,8 +54,7 @@ namespace DotNetCore.CAP ...@@ -54,8 +54,7 @@ namespace DotNetCore.CAP
public void Start() public void Start()
{ {
var matchs = _selector.GetCandidatesMethods(_serviceProvider); var matchs = _selector.GetCandidatesMethods(_serviceProvider);
var groupingMatchs = matchs.GroupBy(x => x.Value.Attribute.Group);
var groupingMatchs = matchs.GroupBy(x => x.Value.Attribute.GroupOrExchange);
foreach (var matchGroup in groupingMatchs) foreach (var matchGroup in groupingMatchs)
{ {
......
using System; using System;
using System.Reflection;
using Newtonsoft.Json; using Newtonsoft.Json;
namespace DotNetCore.CAP.Infrastructure namespace DotNetCore.CAP.Infrastructure
...@@ -46,5 +47,35 @@ namespace DotNetCore.CAP.Infrastructure ...@@ -46,5 +47,35 @@ namespace DotNetCore.CAP.Infrastructure
{ {
return Epoch.AddSeconds(value); return Epoch.AddSeconds(value);
} }
public static bool IsController(TypeInfo typeInfo)
{
if (!typeInfo.IsClass)
{
return false;
}
if (typeInfo.IsAbstract)
{
return false;
}
if (!typeInfo.IsPublic)
{
return false;
}
if (typeInfo.ContainsGenericParameters)
{
return false;
}
if (!typeInfo.Name.EndsWith("Controller", StringComparison.OrdinalIgnoreCase))
{
return false;
}
return true;
}
} }
} }
\ No newline at end of file
...@@ -3,6 +3,7 @@ using System.Collections.Generic; ...@@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Reflection; using System.Reflection;
using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.Internal namespace DotNetCore.CAP.Internal
...@@ -38,9 +39,22 @@ namespace DotNetCore.CAP.Internal ...@@ -38,9 +39,22 @@ namespace DotNetCore.CAP.Internal
public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(IServiceProvider provider) public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(IServiceProvider provider)
{ {
var consumerServices = provider.GetServices<IConsumerService>(); var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
executorDescriptorList.AddRange(FindConsumersFromInterfaceTypes(provider));
executorDescriptorList.AddRange(FindConsumersFromControllerTypes(provider));
return executorDescriptorList;
}
private IReadOnlyList<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes(
IServiceProvider provider)
{
var executorDescriptorList = new List<ConsumerExecutorDescriptor>(); var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
var consumerServices = provider.GetServices<IConsumerService>();
foreach (var service in consumerServices) foreach (var service in consumerServices)
{ {
var typeInfo = service.GetType().GetTypeInfo(); var typeInfo = service.GetType().GetTypeInfo();
...@@ -57,6 +71,31 @@ namespace DotNetCore.CAP.Internal ...@@ -57,6 +71,31 @@ namespace DotNetCore.CAP.Internal
executorDescriptorList.Add(InitDescriptor(topicAttr, method, typeInfo)); executorDescriptorList.Add(InitDescriptor(topicAttr, method, typeInfo));
} }
} }
return executorDescriptorList;
}
private IReadOnlyList<ConsumerExecutorDescriptor> FindConsumersFromControllerTypes(
IServiceProvider provider)
{
var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
// at cap startup time, find all Controller into the DI container,the type is object.
var controllers = provider.GetServices<object>();
foreach (var controller in controllers)
{
var typeInfo = controller.GetType().GetTypeInfo();
//double check
if (Helper.IsController(typeInfo))
{
foreach (var method in typeInfo.DeclaredMethods)
{
var topicAttr = method.GetCustomAttribute<TopicAttribute>(true);
if (topicAttr == null) continue;
executorDescriptorList.Add(InitDescriptor(topicAttr, method, typeInfo));
}
}
continue;
}
return executorDescriptorList; return executorDescriptorList;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment