Commit 2152c285 authored by Lemon's avatar Lemon Committed by 吴晟 Wu Sheng

Support gRPC re-connection and client load balancing (#30)

* Support gRPC re-connection and client load balancing

* Add more grpc connection logs

* Use DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()

* Fix register application instance

* Refactor gRPC reconnection.
parent b117da03
......@@ -152,7 +152,7 @@ namespace SkyWalking.Context.Trace
.Add("error.kind", exception.GetType().FullName)
.Add("message", exception.Message)
.Add("stack", exception.StackTrace)
.Build(DateTime.UtcNow.GetTimeMillis()));
.Build(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()));
return this;
}
......@@ -176,7 +176,7 @@ namespace SkyWalking.Context.Trace
public virtual ISpan Start()
{
_startTime = DateTime.UtcNow.GetTimeMillis();
_startTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
return this;
}
......@@ -205,7 +205,7 @@ namespace SkyWalking.Context.Trace
/// <returns></returns>
public virtual bool Finish(ITraceSegment owner)
{
_endTime = DateTime.UtcNow.GetTimeMillis();
_endTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
owner.Archive(this);
return true;
}
......
......@@ -61,9 +61,9 @@ namespace SkyWalking.AspNetCore
_logger.Info("SkyWalking Agent starting...");
try
{
await GrpcChannelManager.Instance.ConnectAsync();
await ServiceManager.Instance.Initialize();
DiagnosticListener.AllListeners.Subscribe(_diagnosticObserver);
await GrpcConnectionManager.Instance.ConnectAsync();
await ServiceManager.Instance.Initialize();
_logger.Info("SkyWalking Agent started.");
}
catch (Exception e)
......@@ -77,7 +77,7 @@ namespace SkyWalking.AspNetCore
_logger.Info("SkyWalking Agent stopping...");
try
{
await GrpcChannelManager.Instance.ShutdownAsync();
await GrpcConnectionManager.Instance.ShutdownAsync();
ServiceManager.Instance.Dispose();
_logger.Info("SkyWalking Agent stopped.");
}
......
......@@ -68,11 +68,11 @@ namespace SkyWalking.Boot
{
await service.Initialize(_tokenSource.Token);
_services.Add(service.GetType(), service);
_logger.Debug($"ServiceManager init {service.GetType()}.");
_logger.Debug($"ServiceManager loaded {service.GetType()}.");
}
catch (Exception e)
{
_logger.Error($"ServiceManager init {service.GetType()} fail.",e);
_logger.Error($"ServiceManager loaded {service.GetType()} fail.",e);
}
}
}
......
......@@ -40,6 +40,7 @@ namespace SkyWalking.Boot
await Initializing(token);
_task = Task.Factory.StartNew(async () =>
{
await Starting(token);
while (true)
{
try
......@@ -61,6 +62,11 @@ namespace SkyWalking.Boot
return Task.CompletedTask;
}
protected virtual Task Starting(CancellationToken token)
{
return Task.CompletedTask;
}
protected abstract Task Execute(CancellationToken token);
}
}
\ No newline at end of file
......@@ -25,7 +25,7 @@ namespace SkyWalking.Context.Ids
{
public static class GlobalIdGenerator
{
private static readonly ThreadLocal<IDContext> threadIdSequence = new ThreadLocal<IDContext>(() => new IDContext(DateTime.Now.GetTimeMillis(), 0));
private static readonly ThreadLocal<IDContext> threadIdSequence = new ThreadLocal<IDContext>(() => new IDContext(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), 0));
public static ID Generate()
{
......@@ -67,7 +67,7 @@ namespace SkyWalking.Context.Ids
private long GetTimestamp()
{
long currentTimeMillis = DateTime.Now.GetTimeMillis();
long currentTimeMillis = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
if (currentTimeMillis < _lastTimestamp)
{
......
......@@ -357,7 +357,7 @@ namespace SkyWalking.Context
return false;
}
var currentTimeMillis = DateTime.UtcNow.GetTimeMillis();
var currentTimeMillis = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
if (currentTimeMillis - _lastWarningTimestamp > 30 * 1000)
{
//todo log warning
......
......@@ -51,7 +51,7 @@ namespace SkyWalking.Diagnostics
if (listener.Name == diagnosticProcessor.ListenerName)
{
OnNext(listener, diagnosticProcessor);
_logger.Debug($"TracingDiagnosticObserver -- Subscribe {diagnosticProcessor.ListenerName}.");
_logger.Debug($"TracingDiagnosticObserver subscribe diagnosticListener named [{diagnosticProcessor.ListenerName}].");
}
}
}
......
......@@ -25,84 +25,116 @@ using SkyWalking.Boot;
using SkyWalking.Config;
using SkyWalking.Context;
using SkyWalking.Dictionarys;
using SkyWalking.Logging;
using SkyWalking.NetworkProtocol;
namespace SkyWalking.Remote
{
public class GrpcApplicationService : TimerService
{
private static readonly ILogger _logger = LogManager.GetLogger<GrpcApplicationService>();
public override int Order { get; } = -1;
protected override async Task Initializing(CancellationToken token)
{
var application = new Application {ApplicationCode = AgentConfig.ApplicationCode};
var applicationRegisterService =
new ApplicationRegisterService.ApplicationRegisterServiceClient(GrpcChannelManager.Instance.Channel);
var applicationId = default(int?);
protected override TimeSpan Interval { get; } = TimeSpan.FromSeconds(15);
while (!applicationId.HasValue || DictionaryUtil.IsNull(applicationId.Value))
protected override async Task Execute(CancellationToken token)
{
if (!DictionaryUtil.IsNull(RemoteDownstreamConfig.Agent.ApplicationId) &&
!DictionaryUtil.IsNull(RemoteDownstreamConfig.Agent.ApplicationInstanceId))
{
var applicationMapping = await applicationRegisterService.applicationCodeRegisterAsync(application);
applicationId = applicationMapping?.Application?.Value;
return;
}
RemoteDownstreamConfig.Agent.ApplicationId = applicationId.Value;
var instanceDiscoveryService =
new InstanceDiscoveryService.InstanceDiscoveryServiceClient(GrpcChannelManager.Instance.Channel);
var agentUUID = Guid.NewGuid().ToString().Replace("-", "");
var registerTime = DateTime.UtcNow.GetTimeMillis();
var availableConnection = GrpcConnectionManager.Instance.GetAvailableConnection();
var hostName = Dns.GetHostName();
var osInfo = new OSInfo
if (availableConnection == null)
{
Hostname = hostName,
OsName = Environment.OSVersion.ToString(),
ProcessNo = Process.GetCurrentProcess().Id
};
// todo fix Device not configured
//var ipv4s = Dns.GetHostAddresses(hostName);
//foreach (var ipAddress in ipv4s.Where(x => x.AddressFamily == AddressFamily.InterNetwork))
// osInfo.Ipv4S.Add(ipAddress.ToString());
var applicationInstance = new ApplicationInstance
{
ApplicationId = applicationId.Value,
AgentUUID = agentUUID,
RegisterTime = registerTime,
Osinfo = osInfo
};
var applicationInstanceId = 0;
while (DictionaryUtil.IsNull(applicationInstanceId))
{
var applicationInstanceMapping = await instanceDiscoveryService.registerInstanceAsync(applicationInstance);
applicationInstanceId = applicationInstanceMapping.ApplicationInstanceId;
_logger.Warning(
$"Register application fail. {GrpcConnectionManager.NotFoundErrorMessage}");
return;
}
RemoteDownstreamConfig.Agent.ApplicationInstanceId = applicationInstanceId;
}
protected override TimeSpan Interval { get; } = TimeSpan.FromMinutes(1);
protected override async Task Execute(CancellationToken token)
{
var instanceDiscoveryService =
new InstanceDiscoveryService.InstanceDiscoveryServiceClient(GrpcChannelManager.Instance.Channel);
var heartbeat = new ApplicationInstanceHeartbeat
{
ApplicationInstanceId = RemoteDownstreamConfig.Agent.ApplicationInstanceId,
HeartbeatTime = DateTime.UtcNow.GetTimeMillis()
};
await instanceDiscoveryService.heartbeatAsync(heartbeat);
try
{
if (DictionaryUtil.IsNull(RemoteDownstreamConfig.Agent.ApplicationId))
{
var application = new Application {ApplicationCode = AgentConfig.ApplicationCode};
var applicationRegisterService =
new ApplicationRegisterService.ApplicationRegisterServiceClient(availableConnection
.GrpcChannel);
var applicationMapping = await applicationRegisterService.applicationCodeRegisterAsync(application);
var applicationId = applicationMapping?.Application?.Value;
if (!applicationId.HasValue || DictionaryUtil.IsNull(applicationId.Value))
{
_logger.Warning(
"Register application fail. Server response null.");
return;
}
_logger.Info(
$"Register application success. [applicationCode] = {application.ApplicationCode}. [applicationId] = {applicationId.Value}");
RemoteDownstreamConfig.Agent.ApplicationId = applicationId.Value;
}
if (DictionaryUtil.IsNull(RemoteDownstreamConfig.Agent.ApplicationInstanceId))
{
var instanceDiscoveryService =
new InstanceDiscoveryService.InstanceDiscoveryServiceClient(availableConnection.GrpcChannel);
var agentUUID = Guid.NewGuid().ToString("N");
var registerTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var hostName = Dns.GetHostName();
var osInfo = new OSInfo
{
Hostname = hostName,
OsName = Environment.OSVersion.ToString(),
ProcessNo = Process.GetCurrentProcess().Id
};
// todo fix Device not configured
//var ipv4s = Dns.GetHostAddresses(hostName);
//foreach (var ipAddress in ipv4s.Where(x => x.AddressFamily == AddressFamily.InterNetwork))
// osInfo.Ipv4S.Add(ipAddress.ToString());
var applicationInstance = new ApplicationInstance
{
ApplicationId = RemoteDownstreamConfig.Agent.ApplicationId,
AgentUUID = agentUUID,
RegisterTime = registerTime,
Osinfo = osInfo
};
var applicationInstanceId = 0;
var retry = 0;
while (retry++ <= 3 && DictionaryUtil.IsNull(applicationInstanceId))
{
var applicationInstanceMapping =
await instanceDiscoveryService.registerInstanceAsync(applicationInstance);
await Task.Delay(TimeSpan.FromSeconds(1), token);
applicationInstanceId = applicationInstanceMapping.ApplicationInstanceId;
}
if (!DictionaryUtil.IsNull(applicationInstanceId))
{
RemoteDownstreamConfig.Agent.ApplicationInstanceId = applicationInstanceId;
_logger.Info(
$"Register application instance success. [applicationInstanceId] = {applicationInstanceId}");
}
else
{
_logger.Warning(
"Register application instance fail. Server response null.");
}
}
}
catch (Exception exception)
{
_logger.Warning($"Try register application fail. {exception.Message}");
availableConnection?.Failure();
}
}
}
}
\ No newline at end of file
/*
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Threading.Tasks;
using Grpc.Core;
using SkyWalking.Logging;
namespace SkyWalking.Remote
{
public class GrpcConnection
{
private static readonly ILogger _logger = LogManager.GetLogger<GrpcConnection>();
private readonly Channel _internalChannel;
private readonly string _server;
private GrpcConnectionState _state = GrpcConnectionState.Idle;
public Channel GrpcChannel => _internalChannel;
public GrpcConnectionState State => _state;
public string Server => _server;
public GrpcConnection(string server)
{
_server = server;
_internalChannel = new Channel(server, ChannelCredentials.Insecure);
}
public async Task<bool> ConnectAsync()
{
if (_state == GrpcConnectionState.Ready)
{
return true;
}
_state = GrpcConnectionState.Connecting;
try
{
// default timeout = 5s
var deadLine = DateTime.UtcNow.AddSeconds(5);
await _internalChannel.ConnectAsync(deadLine);
_state = GrpcConnectionState.Ready;
_logger.Info($"Grpc channel connect success. [Server] = {_internalChannel.Target}");
}
catch (TaskCanceledException ex)
{
_state = GrpcConnectionState.Failure;
_logger.Warning($"Grpc channel connect timeout. {ex.Message}");
}
catch (Exception ex)
{
_state = GrpcConnectionState.Failure;
_logger.Warning($"Grpc channel connect fail. {ex.Message}");
}
return _state == GrpcConnectionState.Ready;
}
public async Task ShutdowmAsync()
{
try
{
await _internalChannel.ShutdownAsync();
}
catch (Exception e)
{
_logger.Warning($"Grpc channel shutdown fail. {e.Message}");
}
finally
{
_state = GrpcConnectionState.Shutdown;
}
}
public bool CheckState()
{
return _state == GrpcConnectionState.Ready && _internalChannel.State == ChannelState.Ready;
}
public void Failure()
{
var currentState = _state;
if (GrpcConnectionState.Ready == currentState)
{
_logger.Debug($"Grpc channel state changed. {_state} -> {_internalChannel.State}");
}
_state = GrpcConnectionState.Failure;
}
}
}
\ No newline at end of file
/*
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx;
using SkyWalking.Config;
using SkyWalking.Logging;
namespace SkyWalking.Remote
{
public class GrpcConnectionManager
{
private static readonly ILogger _logger = LogManager.GetLogger<GrpcConnectionManager>();
private static readonly GrpcConnectionManager _client = new GrpcConnectionManager();
public const string NotFoundErrorMessage = "Not found available connection.";
public static GrpcConnectionManager Instance => _client;
private readonly Random _random = new Random();
private readonly AsyncLock _lock = new AsyncLock();
private GrpcConnection _connection;
private GrpcConnectionManager()
{
}
public async Task ConnectAsync()
{
// using async lock
using (await _lock.LockAsync())
{
if (_connection != null && _connection.CheckState())
{
return;
}
_connection = new GrpcConnection(GetServer(_connection?.Server));
await _connection.ConnectAsync();
}
}
public async Task ShutdownAsync()
{
await _connection?.ShutdowmAsync();
}
public GrpcConnection GetAvailableConnection()
{
var connection = _connection;
if (connection == null || connection.State != GrpcConnectionState.Ready)
{
_logger.Debug(NotFoundErrorMessage);
return null;
}
return connection;
}
private string GetServer(string currentServer)
{
var servers = RemoteDownstreamConfig.Collector.gRPCServers.Distinct().ToArray();
if (servers.Length == 1)
{
return servers[0];
}
if (currentServer != null)
{
servers = servers.Where(x => x != currentServer).ToArray();
}
var index = _random.Next() % servers.Length;
return servers[index];
}
}
}
\ No newline at end of file
......@@ -16,37 +16,24 @@
*
*/
using System.Linq;
using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using SkyWalking.Boot;
using SkyWalking.Config;
namespace SkyWalking.Remote
{
public class GrpcChannelManager
public class GrpcConnectionService : TimerService
{
private static readonly GrpcChannelManager _client = new GrpcChannelManager();
protected override TimeSpan Interval { get; } = TimeSpan.FromMinutes(1);
public static GrpcChannelManager Instance => _client;
private Channel _channel;
public Channel Channel => _channel;
private GrpcChannelManager()
{
_channel = new Channel(RemoteDownstreamConfig.Collector.gRPCServers.First(), ChannelCredentials.Insecure);
}
public Task ConnectAsync()
{
return _channel.ConnectAsync();
}
public Task ShutdownAsync()
protected override async Task Execute(CancellationToken token)
{
return _channel.ShutdownAsync();
var connection = GrpcConnectionManager.Instance.GetAvailableConnection();
if (connection == null || !connection.CheckState())
{
await GrpcConnectionManager.Instance.ConnectAsync();
}
}
}
}
\ No newline at end of file
......@@ -16,17 +16,14 @@
*
*/
using System;
namespace SkyWalking.Context
namespace SkyWalking.Remote
{
public static class DateTimeExtensions
public enum GrpcConnectionState
{
private static readonly DateTime dtFrom = new DateTime(1970, 1, 1, 0, 0, 0, 0);
public static long GetTimeMillis(this DateTime dateTime)
{
return (dateTime.Ticks - dtFrom.Ticks) / 10000;
}
Idle,
Connecting,
Ready,
Failure,
Shutdown
}
}
/*
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Threading;
using System.Threading.Tasks;
using SkyWalking.Boot;
using SkyWalking.Config;
using SkyWalking.Context;
using SkyWalking.Dictionarys;
using SkyWalking.Logging;
using SkyWalking.NetworkProtocol;
namespace SkyWalking.Remote
{
public class GrpcHeartbeatService : TimerService
{
private static readonly ILogger _logger = LogManager.GetLogger<GrpcHeartbeatService>();
protected override TimeSpan Interval { get; } = TimeSpan.FromMinutes(1);
protected override async Task Starting(CancellationToken token)
{
await Task.Delay(TimeSpan.FromSeconds(15));
}
protected override async Task Execute(CancellationToken token)
{
if (DictionaryUtil.IsNull(RemoteDownstreamConfig.Agent.ApplicationInstanceId))
{
_logger.Warning($"{DateTime.Now} Heartbeat fail. Application instance is not registered.");
return;
}
var availableConnection = GrpcConnectionManager.Instance.GetAvailableConnection();
if (availableConnection == null)
{
_logger.Warning($"{DateTime.Now} Heartbeat fail. {GrpcConnectionManager.NotFoundErrorMessage}");
return;
}
try
{
var instanceDiscoveryService =
new InstanceDiscoveryService.InstanceDiscoveryServiceClient(availableConnection.GrpcChannel);
var heartbeat = new ApplicationInstanceHeartbeat
{
ApplicationInstanceId = RemoteDownstreamConfig.Agent.ApplicationInstanceId,
HeartbeatTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
await instanceDiscoveryService.heartbeatAsync(heartbeat);
_logger.Debug($"{DateTime.Now} Heartbeat.");
}
catch (Exception e)
{
_logger.Warning($"{DateTime.Now} Heartbeat fail. {e.Message}");
availableConnection?.Failure();
}
}
}
}
\ No newline at end of file
......@@ -17,6 +17,7 @@
*/
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using SkyWalking.Boot;
......@@ -30,7 +31,7 @@ namespace SkyWalking.Remote
public class GrpcTraceSegmentService : IBootService, ITracingContextListener
{
private static readonly ILogger _logger = LogManager.GetLogger<GrpcTraceSegmentService>();
public void Dispose()
{
TracingContext.ListenerManager.Remove(this);
......@@ -51,21 +52,33 @@ namespace SkyWalking.Remote
return;
}
var availableConnection = GrpcConnectionManager.Instance.GetAvailableConnection();
if (availableConnection == null)
{
_logger.Warning(
$"Transform and send UpstreamSegment to collector fail. {GrpcConnectionManager.NotFoundErrorMessage}");
return;
}
try
{
var segment = traceSegment.Transform();
var traceSegmentService =
new TraceSegmentService.TraceSegmentServiceClient(GrpcChannelManager.Instance.Channel);
new TraceSegmentService.TraceSegmentServiceClient(availableConnection.GrpcChannel);
using (var asyncClientStreamingCall = traceSegmentService.collect())
{
await asyncClientStreamingCall.RequestStream.WriteAsync(segment);
await asyncClientStreamingCall.RequestStream.CompleteAsync();
}
_logger.Debug(
$"Transform and send UpstreamSegment to collector. TraceSegmentId : {traceSegment.TraceSegmentId}");
}
catch (Exception e)
{
_logger.Error("Transform and send UpstreamSegment to collector fail.", e);
}
_logger.Warning($"Transform and send UpstreamSegment to collector fail. {e.Message}");
availableConnection?.Failure();
}
}
}
}
\ No newline at end of file
......@@ -15,6 +15,7 @@
<ProjectReference Include="..\SkyWalking.Abstractions\SkyWalking.Abstractions.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Nito.AsyncEx.Coordination" Version="5.0.0-pre-05" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.4.1" />
</ItemGroup>
</Project>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment