Commit cc7f2a1d authored by Savorboard's avatar Savorboard

fix consul discovery bug.

parent c531fbdd
...@@ -3,6 +3,8 @@ using System.Diagnostics; ...@@ -3,6 +3,8 @@ using System.Diagnostics;
using System.Net; using System.Net;
using System.Text; using System.Text;
using DotNetCore.CAP.Dashboard.Monitoring; using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.NodeDiscovery;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.Dashboard namespace DotNetCore.CAP.Dashboard
{ {
...@@ -85,13 +87,29 @@ namespace DotNetCore.CAP.Dashboard ...@@ -85,13 +87,29 @@ namespace DotNetCore.CAP.Dashboard
var monitoring = Storage.GetMonitoringApi(); var monitoring = Storage.GetMonitoringApi();
var dto = monitoring.GetStatistics(); var dto = monitoring.GetStatistics();
if (CapCache.Global.TryGet("cap.nodes.count", out var count)) SetServersCount(dto);
dto.Servers = (int) count;
return dto; return dto;
}); });
} }
private void SetServersCount(StatisticsDto dto)
{
if (CapCache.Global.TryGet("cap.nodes.count", out var count))
{
dto.Servers = (int) count;
}
else
{
if (RequestServices.GetService<DiscoveryOptions>() != null)
{
var discoveryProvider = RequestServices.GetService<INodeDiscoveryProvider>();
var nodes = discoveryProvider.GetNodes().GetAwaiter().GetResult();
dto.Servers = nodes.Count;
}
}
}
/// <exclude /> /// <exclude />
protected void WriteLiteral(string textToAppend) protected void WriteLiteral(string textToAppend)
{ {
......
using System; using System;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.NodeDiscovery namespace DotNetCore.CAP.NodeDiscovery
{ {
internal class DiscoveryProviderFactory : IDiscoveryProviderFactory internal class DiscoveryProviderFactory : IDiscoveryProviderFactory
{ {
private readonly ILoggerFactory _loggerFactory;
public DiscoveryProviderFactory(ILoggerFactory loggerFactory)
{
_loggerFactory = loggerFactory;
}
public INodeDiscoveryProvider Create(DiscoveryOptions options) public INodeDiscoveryProvider Create(DiscoveryOptions options)
{ {
if (options == null) if (options == null)
throw new ArgumentNullException(nameof(options)); throw new ArgumentNullException(nameof(options));
return new ConsulNodeDiscoveryProvider(options); return new ConsulNodeDiscoveryProvider(_loggerFactory, options);
} }
} }
} }
\ No newline at end of file
...@@ -3,16 +3,19 @@ using System.Collections.Generic; ...@@ -3,16 +3,19 @@ using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Consul; using Consul;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.NodeDiscovery namespace DotNetCore.CAP.NodeDiscovery
{ {
public class ConsulNodeDiscoveryProvider : INodeDiscoveryProvider, IDisposable public class ConsulNodeDiscoveryProvider : INodeDiscoveryProvider, IDisposable
{ {
private readonly ILogger<ConsulNodeDiscoveryProvider> _logger;
private readonly DiscoveryOptions _options; private readonly DiscoveryOptions _options;
private ConsulClient _consul; private ConsulClient _consul;
public ConsulNodeDiscoveryProvider(DiscoveryOptions options) public ConsulNodeDiscoveryProvider(ILoggerFactory logger, DiscoveryOptions options)
{ {
_logger = logger.CreateLogger<ConsulNodeDiscoveryProvider>();
_options = options; _options = options;
InitClient(); InitClient();
...@@ -27,46 +30,64 @@ namespace DotNetCore.CAP.NodeDiscovery ...@@ -27,46 +30,64 @@ namespace DotNetCore.CAP.NodeDiscovery
{ {
try try
{ {
var services = await _consul.Agent.Services(); var nodes = new List<Node>();
var services = await _consul.Catalog.Services();
var nodes = services.Response.Select(x => new Node foreach (var service in services.Response)
{ {
Id = x.Key, var serviceInfo = await _consul.Catalog.Service(service.Key);
Name = x.Value.Service, var node = serviceInfo.Response.
Address = x.Value.Address, SkipWhile(x => !x.ServiceTags.Contains("CAP"))
Port = x.Value.Port, .Select(info => new Node
Tags = string.Join(", ", x.Value.Tags) {
}); Id = info.ServiceID,
var nodeList = nodes.ToList(); Name = info.ServiceName,
Address = info.ServiceAddress,
Port = info.ServicePort,
Tags = string.Join(", ", info.ServiceTags)
}).ToList();
nodes.AddRange(node);
}
CapCache.Global.AddOrUpdate("cap.nodes.count", nodeList.Count, TimeSpan.FromSeconds(30), true); CapCache.Global.AddOrUpdate("cap.nodes.count", nodes.Count, TimeSpan.FromSeconds(60), true);
return nodeList; return nodes;
} }
catch (Exception) catch (Exception ex)
{ {
CapCache.Global.AddOrUpdate("cap.nodes.count", 0, TimeSpan.FromSeconds(20));
_logger.LogError("Get consul nodes raised an exception. Exception:" + ex.Message);
return null; return null;
} }
} }
public Task RegisterNode() public Task RegisterNode()
{ {
return _consul.Agent.ServiceRegister(new AgentServiceRegistration try
{ {
ID = _options.NodeId.ToString(), return _consul.Agent.ServiceRegister(new AgentServiceRegistration
Name = _options.NodeName,
Address = _options.CurrentNodeHostName,
Port = _options.CurrentNodePort,
Tags = new[] {"CAP", "Client", "Dashboard"},
Check = new AgentServiceCheck
{ {
DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(30), ID = _options.NodeId.ToString(),
Interval = TimeSpan.FromSeconds(10), Name = _options.NodeName,
Status = HealthStatus.Passing, Address = _options.CurrentNodeHostName,
HTTP = Port = _options.CurrentNodePort,
$"http://{_options.CurrentNodeHostName}:{_options.CurrentNodePort}{_options.MatchPath}/health" Tags = new[] { "CAP", "Client", "Dashboard" },
} Check = new AgentServiceCheck
}); {
DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(30),
Interval = TimeSpan.FromSeconds(10),
Status = HealthStatus.Passing,
HTTP =
$"http://{_options.CurrentNodeHostName}:{_options.CurrentNodePort}{_options.MatchPath}/health"
}
});
}
catch (Exception ex)
{
_logger.LogError("Register consul nodes raised an exception. Exception:" + ex.Message);
return null;
}
} }
public void InitClient() public void InitClient()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment