Commit 3dd8451a authored by snakorse's avatar snakorse Committed by Lemon

optimize: send trace segments to collector in batch queue. (#62)

* optimize: send trace segments to collector in batch queue.

* bugfix: batch send trace segments process should continue until the number of segments left in the queue lessthan the _batchsize.

* bugfix: improve the batch sending segments process, and limit the max memory usage of the pending queue.
parent b366155f
...@@ -46,7 +46,14 @@ namespace SkyWalking.Config ...@@ -46,7 +46,14 @@ namespace SkyWalking.Config
/// The max number of spans in a single segment. Through this config item, skywalking keep your application memory cost estimated. /// The max number of spans in a single segment. Through this config item, skywalking keep your application memory cost estimated.
/// </summary> /// </summary>
public static int SpanLimitPerSegment = 300; public static int SpanLimitPerSegment = 300;
/// <summary>
/// The max number of segments in the memory queue waiting to be sent to collector.
/// It means that when the number of queued segments reachs this limit,
/// any more segments enqueued into the sending queue, will leads the same number of oldest queued segments dequeued and droped.
/// Zero or minus value means no limit.
/// </summary>
public static int PendingSegmentsLimit = 300000;
} }
} }
\ No newline at end of file
...@@ -47,5 +47,13 @@ namespace SkyWalking.AspNet ...@@ -47,5 +47,13 @@ namespace SkyWalking.AspNet
/// Namespace isolates headers in cross propagation. The HEADER name will be 'HeaderName:Namespace'. /// Namespace isolates headers in cross propagation. The HEADER name will be 'HeaderName:Namespace'.
/// </summary> /// </summary>
public string Namespace { get; set; } public string Namespace { get; set; }
/// <summary>
/// The max number of segments in the memory queue waiting to be sent to collector.
/// It means that when the number of queued segments reachs this limit,
/// any more segments enqueued into the sending queue, will leads the same number of oldest queued segments dequeued and droped.
/// Zero or minus value means no limit.
/// </summary>
public int PendingSegmentsLimit { get; set; } = 300000;
} }
} }
\ No newline at end of file
...@@ -53,6 +53,11 @@ namespace SkyWalking.AspNet ...@@ -53,6 +53,11 @@ namespace SkyWalking.AspNet
{ {
AgentConfig.SamplePer3Secs = v; AgentConfig.SamplePer3Secs = v;
} }
var pendingSegmentsLimit = GetAppSetting(nameof(AgentConfig.PendingSegmentsLimit), false);
if(int.TryParse(pendingSegmentsLimit, out v))
{
AgentConfig.PendingSegmentsLimit = v;
}
} }
private string GetAppSetting(string key, bool @throw) private string GetAppSetting(string key, bool @throw)
......
...@@ -54,6 +54,7 @@ namespace SkyWalking.AspNetCore ...@@ -54,6 +54,7 @@ namespace SkyWalking.AspNetCore
AgentConfig.ApplicationCode = options.Value.ApplicationCode; AgentConfig.ApplicationCode = options.Value.ApplicationCode;
CollectorConfig.DirectServers = options.Value.DirectServers; CollectorConfig.DirectServers = options.Value.DirectServers;
AgentConfig.SamplePer3Secs = options.Value.SamplePer3Secs; AgentConfig.SamplePer3Secs = options.Value.SamplePer3Secs;
AgentConfig.PendingSegmentsLimit = options.Value.PendingSegmentsLimit;
_logger = LogManager.GetLogger<SkyWalkingHostedService>(); _logger = LogManager.GetLogger<SkyWalkingHostedService>();
_diagnosticObserver = diagnosticObserver; _diagnosticObserver = diagnosticObserver;
} }
...@@ -79,8 +80,8 @@ namespace SkyWalking.AspNetCore ...@@ -79,8 +80,8 @@ namespace SkyWalking.AspNetCore
_logger.Info("SkyWalking Agent stopping..."); _logger.Info("SkyWalking Agent stopping...");
try try
{ {
await GrpcConnectionManager.Instance.ShutdownAsync();
ServiceManager.Instance.Dispose(); ServiceManager.Instance.Dispose();
await GrpcConnectionManager.Instance.ShutdownAsync();
_logger.Info("SkyWalking Agent stopped."); _logger.Info("SkyWalking Agent stopped.");
} }
catch (Exception e) catch (Exception e)
......
...@@ -58,5 +58,17 @@ namespace SkyWalking.AspNetCore ...@@ -58,5 +58,17 @@ namespace SkyWalking.AspNetCore
get; get;
set; set;
} = -1; } = -1;
/// <summary>
/// The max number of segments in the memory queue waiting to be sent to collector.
/// It means that when the number of queued segments reachs this limit,
/// any more segments enqueued into the sending queue, will leads the same number of oldest queued segments dequeued and droped.
/// Zero or minus value means no limit.
/// </summary>
public int PendingSegmentsLimit
{
get;
set;
} = 300000;
} }
} }
...@@ -17,10 +17,13 @@ ...@@ -17,10 +17,13 @@
*/ */
using System; using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using SkyWalking.Boot; using SkyWalking.Boot;
using SkyWalking.Config;
using SkyWalking.Context; using SkyWalking.Context;
using SkyWalking.Context.Trace; using SkyWalking.Context.Trace;
using SkyWalking.Logging; using SkyWalking.Logging;
...@@ -29,21 +32,30 @@ using SkyWalking.Utils; ...@@ -29,21 +32,30 @@ using SkyWalking.Utils;
namespace SkyWalking.Remote namespace SkyWalking.Remote
{ {
public class GrpcTraceSegmentService : IBootService, ITracingContextListener public class GrpcTraceSegmentService : TimerService, ITracingContextListener
{ {
private static readonly ILogger _logger = LogManager.GetLogger<GrpcTraceSegmentService>(); private static readonly ILogger _logger = LogManager.GetLogger<GrpcTraceSegmentService>();
private static readonly ConcurrentQueue<ITraceSegment> _traceSegments
= new ConcurrentQueue<ITraceSegment>();
public void Dispose() public override void Dispose()
{ {
TracingContext.ListenerManager.Remove(this); TracingContext.ListenerManager.Remove(this);
if(_traceSegments.Count > 0)
{
BatchSendTraceSegments().GetAwaiter().GetResult();
}
base.Dispose();
} }
public int Order { get; } = 1; public override int Order { get; } = 1;
public Task Initialize(CancellationToken token) protected override TimeSpan Interval => TimeSpan.FromSeconds(1);
protected override Task Initializing(CancellationToken token)
{ {
TracingContext.ListenerManager.Add(this); TracingContext.ListenerManager.Add(this);
return TaskUtils.CompletedTask; return base.Initializing(token);
} }
public async void AfterFinished(ITraceSegment traceSegment) public async void AfterFinished(ITraceSegment traceSegment)
...@@ -53,6 +65,23 @@ namespace SkyWalking.Remote ...@@ -53,6 +65,23 @@ namespace SkyWalking.Remote
return; return;
} }
if (_traceSegments.Count >= AgentConfig.PendingSegmentsLimit && AgentConfig.PendingSegmentsLimit > 0)
{
_traceSegments.TryDequeue(out var v);
}
_traceSegments.Enqueue(traceSegment);
}
protected async override Task Execute(CancellationToken token)
{
await BatchSendTraceSegments();
}
private async Task BatchSendTraceSegments()
{
if (_traceSegments.Count == 0)
return;
var availableConnection = GrpcConnectionManager.Instance.GetAvailableConnection(); var availableConnection = GrpcConnectionManager.Instance.GetAvailableConnection();
if (availableConnection == null) if (availableConnection == null)
{ {
...@@ -63,23 +92,25 @@ namespace SkyWalking.Remote ...@@ -63,23 +92,25 @@ namespace SkyWalking.Remote
try try
{ {
var segment = traceSegment.Transform();
var traceSegmentService = var traceSegmentService =
new TraceSegmentService.TraceSegmentServiceClient(availableConnection.GrpcChannel); new TraceSegmentService.TraceSegmentServiceClient(availableConnection.GrpcChannel);
using (var asyncClientStreamingCall = traceSegmentService.collect()) using (var asyncClientStreamingCall = traceSegmentService.collect())
{ {
await asyncClientStreamingCall.RequestStream.WriteAsync(segment); while (_traceSegments.TryDequeue(out var segment))
{
await asyncClientStreamingCall.RequestStream.WriteAsync(segment.Transform());
_logger.Debug(
$"Transform and send UpstreamSegment to collector. [TraceSegmentId] = {segment.TraceSegmentId} [GlobalTraceId] = {segment.RelatedGlobalTraces.FirstOrDefault()}");
}
await asyncClientStreamingCall.RequestStream.CompleteAsync(); await asyncClientStreamingCall.RequestStream.CompleteAsync();
await asyncClientStreamingCall.ResponseAsync; await asyncClientStreamingCall.ResponseAsync;
} }
_logger.Debug(
$"Transform and send UpstreamSegment to collector. [TraceSegmentId] = {traceSegment.TraceSegmentId} [GlobalTraceId] = {traceSegment.RelatedGlobalTraces.FirstOrDefault()}");
} }
catch (Exception e) catch (Exception e)
{ {
_logger.Warning($"Transform and send UpstreamSegment to collector fail. {e.Message}"); _logger.Warning($"Transform and send UpstreamSegment to collector fail. {e.Message}");
availableConnection?.Failure(); availableConnection?.Failure();
return;
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment