Unverified Commit 437d7810 authored by Lemon's avatar Lemon Committed by GitHub

Re-implementation language-agent-protocol V5 (#140)

* add ProtocolVersion config

* Add Protocol Version Selection Switch

* add v5 collect service
parent 10b85e67
......@@ -22,7 +22,7 @@ namespace SkyWalking.Config
public class TransportConfig
{
public int PendingSegmentLimit { get; set; } = 30000;
/// <summary>
/// Flush Interval Millisecond
/// </summary>
......@@ -32,5 +32,14 @@ namespace SkyWalking.Config
/// Data queued beyond this time will be discarded.
/// </summary>
public int PendingSegmentTimeout { get; set; } = 1000;
public string ProtocolVersion { get; set; } = ProtocolVersions.V6;
}
public static class ProtocolVersions
{
public static string V5 { get; } = "v5";
public static string V6 { get; } = "v6";
}
}
\ No newline at end of file
......@@ -48,6 +48,6 @@ namespace SkyWalking.Context.Trace
void RelatedGlobalTrace(DistributedTraceId distributedTraceId);
TraceSegmentRequest Transform();
SegmentRequest Transform();
}
}
......@@ -29,6 +29,6 @@ namespace SkyWalking.Context.Trace
int EntryApplicationInstanceId { get; }
TraceSegmentReferenceRequest Transform();
SegmentReferenceRequest Transform();
}
}
\ No newline at end of file
......@@ -21,9 +21,9 @@ using System.Threading.Tasks;
namespace SkyWalking.Transport
{
public interface ITraceDispatcher
public interface ISegmentDispatcher
{
bool Dispatch(TraceSegmentRequest segment);
bool Dispatch(SegmentRequest segment);
Task Flush(CancellationToken token = default(CancellationToken));
......
......@@ -4,9 +4,9 @@ using System.Threading.Tasks;
namespace SkyWalking.Transport
{
public interface ITraceReporter
public interface ISegmentReporter
{
Task ReportAsync(IReadOnlyCollection<TraceSegmentRequest> segmentRequests,
Task ReportAsync(IReadOnlyCollection<SegmentRequest> segmentRequests,
CancellationToken cancellationToken = default(CancellationToken));
}
}
\ No newline at end of file
......@@ -20,11 +20,11 @@ using System.Collections.Generic;
namespace SkyWalking.Transport
{
public class TraceSegmentRequest
public class SegmentRequest
{
public IEnumerable<UniqueIdRequest> UniqueIds { get; set; }
public TraceSegmentObjectRequest Segment { get; set; }
public SegmentObjectRequest Segment { get; set; }
}
public class UniqueIdRequest
......@@ -41,13 +41,13 @@ namespace SkyWalking.Transport
}
}
public class TraceSegmentObjectRequest
public class SegmentObjectRequest
{
public UniqueIdRequest SegmentId { get; set; }
public int ApplicationId { get; set; }
public int ServiceId { get; set; }
public int ApplicationInstanceId { get; set; }
public int ServiceInstanceId { get; set; }
public IList<SpanRequest> Spans { get; set; } = new List<SpanRequest>();
}
......@@ -74,28 +74,28 @@ namespace SkyWalking.Transport
public bool IsError { get; set; }
public IList<TraceSegmentReferenceRequest> References { get; } = new List<TraceSegmentReferenceRequest>();
public IList<SegmentReferenceRequest> References { get; } = new List<SegmentReferenceRequest>();
public IList<KeyValuePair<string, string>> Tags { get; } = new List<KeyValuePair<string, string>>();
public IList<LogDataRequest> Logs { get; } = new List<LogDataRequest>();
}
public class TraceSegmentReferenceRequest
public class SegmentReferenceRequest
{
public UniqueIdRequest ParentTraceSegmentId { get; set; }
public UniqueIdRequest ParentSegmentId { get; set; }
public int ParentApplicationInstanceId { get; set; }
public int ParentServiceInstanceId { get; set; }
public int ParentSpanId { get; set; }
public int EntryApplicationInstanceId { get; set; }
public int EntryServiceInstanceId { get; set; }
public int RefType { get; set; }
public StringOrIntValue ParentServiceName { get; set; }
public StringOrIntValue ParentEndpointName { get; set; }
public StringOrIntValue EntryServiceName { get; set; }
public StringOrIntValue EntryEndpointName { get; set; }
public StringOrIntValue NetworkAddress { get; set; }
}
......
/*
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenSkywalking licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using SkyWalking.Transport;
namespace SkyWalking
{
[Obsolete]
public interface ISkyWalkingClient
{
Task<NullableValue> RegisterApplicationAsync(string applicationCode, CancellationToken cancellationToken = default(CancellationToken));
Task<NullableValue> RegisterApplicationInstanceAsync(int applicationId, Guid agentUUID, long registerTime, AgentOsInfoRequest osInfoRequest,
CancellationToken cancellationToken = default(CancellationToken));
Task HeartbeatAsync(int applicationInstance, long heartbeatTime, CancellationToken cancellationToken = default(CancellationToken));
Task CollectAsync(IEnumerable<TraceSegmentRequest> request, CancellationToken cancellationToken = default(CancellationToken));
}
/*
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenSkywalking licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Threading;
using System.Threading.Tasks;
namespace SkyWalking.Transport
{
public interface ISkyWalkingClientV5
{
Task<NullableValue> RegisterApplicationAsync(string applicationCode, CancellationToken cancellationToken = default(CancellationToken));
Task<NullableValue> RegisterApplicationInstanceAsync(int applicationId, Guid agentUUID, long registerTime, AgentOsInfoRequest osInfoRequest,
CancellationToken cancellationToken = default(CancellationToken));
Task HeartbeatAsync(int applicationInstance, long heartbeatTime, CancellationToken cancellationToken = default(CancellationToken));
}
}
\ No newline at end of file
......@@ -33,6 +33,7 @@ using SkyWalking.Logging;
using SkyWalking.Service;
using SkyWalking.Transport;
using SkyWalking.Transport.Grpc;
using SkyWalking.Transport.Grpc.V5;
using SkyWalking.Transport.Grpc.V6;
namespace SkyWalking.Agent.AspNetCore
......@@ -47,11 +48,12 @@ namespace SkyWalking.Agent.AspNetCore
}
services.AddSingleton<IContextCarrierFactory, ContextCarrierFactory>();
services.AddSingleton<ITraceDispatcher, AsyncQueueTraceDispatcher>();
services.AddSingleton<IExecutionService, TraceSegmentTransportService>();
services.AddSingleton<ISegmentDispatcher, AsyncQueueSegmentDispatcher>();
services.AddSingleton<IExecutionService, SegmentReportService>();
services.AddSingleton<IExecutionService, RegisterService>();
services.AddSingleton<IExecutionService, PingService>();
services.AddSingleton<IExecutionService, SamplingRefreshService>();
services.AddSingleton<IExecutionService, ServiceDiscoveryV5Service>();
services.AddSingleton<ISkyWalkingAgentStartup, SkyWalkingAgentStartup>();
services.AddSingleton<ISampler>(DefaultSampler.Instance);
services.AddSingleton<IRuntimeEnvironment>(RuntimeEnvironment.Instance);
......@@ -67,7 +69,8 @@ namespace SkyWalking.Agent.AspNetCore
private static IServiceCollection AddGrpcTransport(this IServiceCollection services)
{
services.AddSingleton<ITraceReporter, TraceReporter>();
services.AddSingleton<ISkyWalkingClientV5, SkyWalkingClientV5>();
services.AddSingleton<ISegmentReporter, SegmentReporter>();
services.AddSingleton<ConnectionManager>();
services.AddSingleton<IPingCaller, PingCaller>();
services.AddSingleton<IServiceRegister, ServiceRegister>();
......
......@@ -2,7 +2,7 @@
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* The OpenSkywalking licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
......@@ -24,6 +24,7 @@ using SkyWalking.Logging;
using SkyWalking.Service;
using SkyWalking.Transport;
using SkyWalking.Transport.Grpc;
using SkyWalking.Transport.Grpc.V5;
using SkyWalking.Transport.Grpc.V6;
using SkyWalking.Utilities.Configuration;
using SkyWalking.Utilities.Logging;
......@@ -35,23 +36,25 @@ namespace SkyWalking.AspNet.Extensions
public static IServiceCollection AddSkyWalkingCore(this IServiceCollection services)
{
services.AddSingleton<IContextCarrierFactory, ContextCarrierFactory>();
services.AddSingleton<ITraceDispatcher, AsyncQueueTraceDispatcher>();
services.AddSingleton<IExecutionService, TraceSegmentTransportService>();
services.AddSingleton<ISegmentDispatcher, AsyncQueueSegmentDispatcher>();
services.AddSingleton<IExecutionService, SegmentReportService>();
services.AddSingleton<IExecutionService, RegisterService>();
services.AddSingleton<IExecutionService, PingService>();
services.AddSingleton<IExecutionService, SamplingRefreshService>();
services.AddSingleton<IExecutionService, ServiceDiscoveryV5Service>();
services.AddSingleton<ISkyWalkingAgentStartup, SkyWalkingAgentStartup>();
services.AddSingleton<ISampler>(DefaultSampler.Instance);
services.AddSingleton<IRuntimeEnvironment>(RuntimeEnvironment.Instance);
services.AddSingleton<TracingDiagnosticProcessorObserver>();
services.AddSingleton<IConfigAccessor, ConfigAccessor>();
services.AddSingleton<IEnvironmentProvider, HostingEnvironmentProvider>();
services.AddSingleton<ITraceReporter, TraceReporter>();
services.AddSingleton<ISegmentReporter, SegmentReporter>();
services.AddSingleton<ConnectionManager>();
services.AddSingleton<IPingCaller, PingCaller>();
services.AddSingleton<IServiceRegister, ServiceRegister>();
services.AddSingleton<IExecutionService, ConnectService>();
services.AddSingleton<ILoggerFactory, DefaultLoggerFactory>();
services.AddSingleton<ISkyWalkingClientV5, SkyWalkingClientV5>();
return services;
}
}
......
......@@ -82,20 +82,20 @@ namespace SkyWalking.Context.Trace
_relatedGlobalTraces.Append(distributedTraceId);
}
public TraceSegmentRequest Transform()
public SegmentRequest Transform()
{
var upstreamSegment = new TraceSegmentRequest
var upstreamSegment = new SegmentRequest
{
UniqueIds = _relatedGlobalTraces.GetRelatedGlobalTraces()
.Select(x => x.ToUniqueId()).ToArray()
};
upstreamSegment.Segment = new TraceSegmentObjectRequest
upstreamSegment.Segment = new SegmentObjectRequest
{
SegmentId = TraceSegmentId.Transform(),
Spans = _spans.Select(x => x.Transform()).ToArray(),
ApplicationId = ApplicationId,
ApplicationInstanceId = ApplicationInstanceId
ServiceId = ApplicationId,
ServiceInstanceId = ApplicationInstanceId
};
return upstreamSegment;
......
......@@ -147,30 +147,30 @@ namespace SkyWalking.Context.Trace
public int EntryApplicationInstanceId => _entryApplicationInstanceId;
public TraceSegmentReferenceRequest Transform()
public SegmentReferenceRequest Transform()
{
TraceSegmentReferenceRequest traceSegmentReference = new TraceSegmentReferenceRequest();
SegmentReferenceRequest segmentReference = new SegmentReferenceRequest();
if (_type == SegmentRefType.CrossProcess)
{
traceSegmentReference.RefType = (int) SegmentRefType.CrossProcess;
traceSegmentReference.NetworkAddress = new StringOrIntValue(_peerId, _peerHost);
segmentReference.RefType = (int) SegmentRefType.CrossProcess;
segmentReference.NetworkAddress = new StringOrIntValue(_peerId, _peerHost);
}
else
{
traceSegmentReference.RefType = (int) SegmentRefType.CrossThread;
traceSegmentReference.NetworkAddress = new StringOrIntValue();
segmentReference.RefType = (int) SegmentRefType.CrossThread;
segmentReference.NetworkAddress = new StringOrIntValue();
}
traceSegmentReference.ParentApplicationInstanceId = _parentApplicationInstanceId;
traceSegmentReference.EntryApplicationInstanceId = _entryApplicationInstanceId;
traceSegmentReference.ParentTraceSegmentId = _traceSegmentId.Transform();
traceSegmentReference.ParentSpanId = _spanId;
segmentReference.ParentServiceInstanceId = _parentApplicationInstanceId;
segmentReference.EntryServiceInstanceId = _entryApplicationInstanceId;
segmentReference.ParentSegmentId = _traceSegmentId.Transform();
segmentReference.ParentSpanId = _spanId;
traceSegmentReference.EntryServiceName = new StringOrIntValue(_entryOperationId, _entryOperationName);
segmentReference.EntryEndpointName = new StringOrIntValue(_entryOperationId, _entryOperationName);
traceSegmentReference.ParentServiceName = new StringOrIntValue(_parentOperationId, _parentOperationName);
segmentReference.ParentEndpointName = new StringOrIntValue(_parentOperationId, _parentOperationName);
return traceSegmentReference;
return segmentReference;
}
}
}
......@@ -19,6 +19,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using SkyWalking.Config;
using SkyWalking.Logging;
using SkyWalking.Transport;
......@@ -27,14 +28,20 @@ namespace SkyWalking.Service
public class PingService : ExecutionService
{
private readonly IPingCaller _pingCaller;
private readonly TransportConfig _transportConfig;
public PingService(IPingCaller pingCaller, IRuntimeEnvironment runtimeEnvironment,
public PingService(IConfigAccessor configAccessor, IPingCaller pingCaller,
IRuntimeEnvironment runtimeEnvironment,
ILoggerFactory loggerFactory) : base(
runtimeEnvironment, loggerFactory)
{
_pingCaller = pingCaller;
_transportConfig = configAccessor.Get<TransportConfig>();
}
protected override bool CanExecute() =>
_transportConfig.ProtocolVersion == ProtocolVersions.V6 && base.CanExecute();
protected override TimeSpan DueTime { get; } = TimeSpan.FromSeconds(30);
protected override TimeSpan Period { get; } = TimeSpan.FromSeconds(60);
......
......@@ -13,6 +13,7 @@ namespace SkyWalking.Service
{
private readonly InstrumentationConfig _config;
private readonly IServiceRegister _serviceRegister;
private readonly TransportConfig _transportConfig;
public RegisterService(IConfigAccessor configAccessor, IServiceRegister serviceRegister,
IRuntimeEnvironment runtimeEnvironment, ILoggerFactory loggerFactory) : base(runtimeEnvironment,
......@@ -20,13 +21,15 @@ namespace SkyWalking.Service
{
_serviceRegister = serviceRegister;
_config = configAccessor.Get<InstrumentationConfig>();
_transportConfig = configAccessor.Get<TransportConfig>();
}
protected override TimeSpan DueTime { get; } = TimeSpan.Zero;
protected override TimeSpan Period { get; } = TimeSpan.FromSeconds(30);
protected override bool CanExecute() => true;
protected override bool CanExecute() =>
_transportConfig.ProtocolVersion == ProtocolVersions.V6 && !RuntimeEnvironment.Initialized;
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
......
......@@ -2,7 +2,7 @@
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* The OpenSkywalking licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
......@@ -27,12 +27,12 @@ using SkyWalking.Transport;
namespace SkyWalking.Service
{
public class TraceSegmentTransportService : ExecutionService, ITracingContextListener
public class SegmentReportService : ExecutionService, ITracingContextListener
{
private readonly TransportConfig _config;
private readonly ITraceDispatcher _dispatcher;
private readonly ISegmentDispatcher _dispatcher;
public TraceSegmentTransportService(IConfigAccessor configAccessor, ITraceDispatcher dispatcher,
public SegmentReportService(IConfigAccessor configAccessor, ISegmentDispatcher dispatcher,
IRuntimeEnvironment runtimeEnvironment, ILoggerFactory loggerFactory)
: base(runtimeEnvironment, loggerFactory)
{
......
/*
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenSkywalking licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using SkyWalking.Config;
using SkyWalking.Logging;
using SkyWalking.Transport;
using SkyWalking.Utils;
namespace SkyWalking.Service
{
public class ServiceDiscoveryV5Service : ExecutionService
{
private readonly InstrumentationConfig _config;
private readonly TransportConfig _transportConfig;
private readonly ISkyWalkingClientV5 _skyWalkingClient;
protected override TimeSpan DueTime { get; } = TimeSpan.Zero;
protected override TimeSpan Period { get; } = TimeSpan.FromSeconds(30);
public ServiceDiscoveryV5Service(IConfigAccessor configAccessor, ISkyWalkingClientV5 skyWalkingClient,
IRuntimeEnvironment runtimeEnvironment, ILoggerFactory loggerFactory)
: base(runtimeEnvironment, loggerFactory)
{
_config = configAccessor.Get<InstrumentationConfig>();
_transportConfig = configAccessor.Get<TransportConfig>();
_skyWalkingClient = skyWalkingClient;
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
await RegisterApplication(cancellationToken);
await RegisterApplicationInstance(cancellationToken);
await Heartbeat(cancellationToken);
}
protected override bool CanExecute() =>
_transportConfig.ProtocolVersion == ProtocolVersions.V5 && !RuntimeEnvironment.Initialized;
private async Task RegisterApplication(CancellationToken cancellationToken)
{
if (!RuntimeEnvironment.ServiceId.HasValue)
{
var value = await Polling(3,
() => _skyWalkingClient.RegisterApplicationAsync(_config.ServiceName ?? _config.ApplicationCode, cancellationToken),
cancellationToken);
if (value.HasValue && RuntimeEnvironment is RuntimeEnvironment environment)
{
environment.ServiceId = value;
Logger.Information($"Registered Application[Id={environment.ServiceId.Value}].");
}
}
}
private async Task RegisterApplicationInstance(CancellationToken cancellationToken)
{
if (RuntimeEnvironment.ServiceId.HasValue && !RuntimeEnvironment.ServiceInstanceId.HasValue)
{
var osInfoRequest = new AgentOsInfoRequest
{
HostName = DnsHelpers.GetHostName(),
IpAddress = DnsHelpers.GetIpV4s(),
OsName = PlatformInformation.GetOSName(),
ProcessNo = Process.GetCurrentProcess().Id
};
var value = await Polling(3,
() => _skyWalkingClient.RegisterApplicationInstanceAsync(RuntimeEnvironment.ServiceId.Value,
RuntimeEnvironment.InstanceId,
DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), osInfoRequest, cancellationToken),
cancellationToken);
if (value.HasValue && RuntimeEnvironment is RuntimeEnvironment environment)
{
environment.ServiceInstanceId = value;
Logger.Information(
$"Registered Application Instance[Id={environment.ServiceInstanceId.Value}].");
}
}
}
private static async Task<NullableValue> Polling(int retry, Func<Task<NullableValue>> execute,
CancellationToken cancellationToken)
{
var index = 0;
while (index++ < retry)
{
var value = await execute();
if (value.HasValue)
{
return value;
}
await Task.Delay(500, cancellationToken);
}
return NullableValue.Null;
}
private async Task Heartbeat(CancellationToken cancellationToken)
{
if (RuntimeEnvironment.Initialized)
{
try
{
await _skyWalkingClient.HeartbeatAsync(RuntimeEnvironment.ServiceInstanceId.Value,
DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), cancellationToken);
Logger.Debug($"Heartbeat at {DateTimeOffset.UtcNow}.");
}
catch (Exception e)
{
Logger.Error("Heartbeat error.", e);
}
}
}
}
}
\ No newline at end of file
......@@ -42,7 +42,7 @@ namespace SkyWalking
public async Task StartAsync(CancellationToken cancellationToken = default(CancellationToken))
{
_logger.Information(Welcome());
_logger.Information("Initializing ...");
foreach (var service in _services)
await service.StartAsync(cancellationToken);
DiagnosticListener.AllListeners.Subscribe(_observer);
......
......@@ -25,25 +25,25 @@ using SkyWalking.Logging;
namespace SkyWalking.Transport
{
public class AsyncQueueTraceDispatcher : ITraceDispatcher
public class AsyncQueueSegmentDispatcher : ISegmentDispatcher
{
private readonly ILogger _logger;
private readonly TransportConfig _config;
private readonly ITraceReporter _traceReporter;
private readonly ConcurrentQueue<TraceSegmentRequest> _segmentQueue;
private readonly ISegmentReporter _segmentReporter;
private readonly ConcurrentQueue<SegmentRequest> _segmentQueue;
private readonly CancellationTokenSource _cancellation;
public AsyncQueueTraceDispatcher(IConfigAccessor configAccessor, ITraceReporter traceReporter,
public AsyncQueueSegmentDispatcher(IConfigAccessor configAccessor, ISegmentReporter segmentReporter,
ILoggerFactory loggerFactory)
{
_traceReporter = traceReporter;
_logger = loggerFactory.CreateLogger(typeof(AsyncQueueTraceDispatcher));
_segmentReporter = segmentReporter;
_logger = loggerFactory.CreateLogger(typeof(AsyncQueueSegmentDispatcher));
_config = configAccessor.Get<TransportConfig>();
_segmentQueue = new ConcurrentQueue<TraceSegmentRequest>();
_segmentQueue = new ConcurrentQueue<SegmentRequest>();
_cancellation = new CancellationTokenSource();
}
public bool Dispatch(TraceSegmentRequest segment)
public bool Dispatch(SegmentRequest segment)
{
// todo performance optimization for ConcurrentQueue
if (_config.PendingSegmentLimit < _segmentQueue.Count || _cancellation.IsCancellationRequested)
......@@ -64,7 +64,7 @@ namespace SkyWalking.Transport
//var limit = queued <= _config.PendingSegmentLimit ? queued : _config.PendingSegmentLimit;
var limit = _config.PendingSegmentLimit;
var index = 0;
var segments = new List<TraceSegmentRequest>(limit);
var segments = new List<SegmentRequest>(limit);
while (index++ < limit && _segmentQueue.TryDequeue(out var request))
{
segments.Add(request);
......@@ -72,7 +72,7 @@ namespace SkyWalking.Transport
// send async
if (segments.Count > 0)
_traceReporter.ReportAsync(segments, token);
_segmentReporter.ReportAsync(segments, token);
return Task.CompletedTask;
}
......
......@@ -2,7 +2,7 @@
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* The OpenSkywalking licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
......
......@@ -2,7 +2,7 @@
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* The OpenSkywalking licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
......
/*
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenSkywalking licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using SkyWalking.Config;
using SkyWalking.Logging;
using SegmentReporterV5 = SkyWalking.Transport.Grpc.V5.SegmentReporter;
using SegmentReporterV6 = SkyWalking.Transport.Grpc.V6.SegmentReporter;
namespace SkyWalking.Transport.Grpc
{
public class SegmentReporter : ISegmentReporter
{
private readonly ISegmentReporter _segmentReporterV5;
private readonly ISegmentReporter _segmentReporterV6;
private readonly TransportConfig _transportConfig;
public SegmentReporter(ConnectionManager connectionManager, IConfigAccessor configAccessor,
ILoggerFactory loggerFactory)
{
_transportConfig = configAccessor.Get<TransportConfig>();
_segmentReporterV5 = new SegmentReporterV5(connectionManager, configAccessor, loggerFactory);
_segmentReporterV6 = new SegmentReporterV6(connectionManager, configAccessor, loggerFactory);
}
public async Task ReportAsync(IReadOnlyCollection<SegmentRequest> segmentRequests,
CancellationToken cancellationToken = default(CancellationToken))
{
if (_transportConfig.ProtocolVersion == ProtocolVersions.V6)
await _segmentReporterV6.ReportAsync(segmentRequests, cancellationToken);
if (_transportConfig.ProtocolVersion == ProtocolVersions.V5)
await _segmentReporterV5.ReportAsync(segmentRequests, cancellationToken);
}
}
}
\ No newline at end of file
/*
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenSkywalking licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Linq;
using Google.Protobuf;
......@@ -5,9 +23,9 @@ using SkyWalking.NetworkProtocol;
namespace SkyWalking.Transport.Grpc
{
internal static class TraceSegmentHelpers
internal static class SegmentV5Helpers
{
public static UpstreamSegment Map(TraceSegmentRequest request)
public static UpstreamSegment Map(SegmentRequest request)
{
var upstreamSegment = new UpstreamSegment();
......@@ -16,8 +34,8 @@ namespace SkyWalking.Transport.Grpc
var traceSegment = new TraceSegmentObject
{
TraceSegmentId = MapToUniqueId(request.Segment.SegmentId),
ApplicationId = request.Segment.ApplicationId,
ApplicationInstanceId = request.Segment.ApplicationInstanceId,
ApplicationId = request.Segment.ServiceId,
ApplicationInstanceId = request.Segment.ServiceInstanceId,
IsSizeLimited = false
};
......@@ -60,20 +78,20 @@ namespace SkyWalking.Transport.Grpc
return spanObject;
}
private static TraceSegmentReference MapToSegmentReference(TraceSegmentReferenceRequest referenceRequest)
private static TraceSegmentReference MapToSegmentReference(SegmentReferenceRequest referenceRequest)
{
var reference = new TraceSegmentReference
{
ParentApplicationInstanceId = referenceRequest.ParentApplicationInstanceId,
EntryApplicationInstanceId = referenceRequest.EntryApplicationInstanceId,
ParentApplicationInstanceId = referenceRequest.ParentServiceInstanceId,
EntryApplicationInstanceId = referenceRequest.EntryServiceInstanceId,
ParentSpanId = referenceRequest.ParentSpanId,
RefType = (RefType) referenceRequest.RefType,
ParentTraceSegmentId = MapToUniqueId(referenceRequest.ParentTraceSegmentId)
ParentTraceSegmentId = MapToUniqueId(referenceRequest.ParentSegmentId)
};
ReadStringOrIntValue(reference, referenceRequest.NetworkAddress, NetworkAddressReader, NetworkAddressIdReader);
ReadStringOrIntValue(reference, referenceRequest.EntryServiceName, EntryServiceReader, EntryServiceIdReader);
ReadStringOrIntValue(reference, referenceRequest.ParentServiceName, ParentServiceReader, ParentServiceIdReader);
ReadStringOrIntValue(reference, referenceRequest.EntryEndpointName, EntryServiceReader, EntryServiceIdReader);
ReadStringOrIntValue(reference, referenceRequest.ParentEndpointName, ParentServiceReader, ParentServiceIdReader);
return reference;
}
......
/*
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenSkywalking licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Linq;
using Google.Protobuf;
using SkyWalking.NetworkProtocol;
namespace SkyWalking.Transport.Grpc
{
internal static class SegmentV6Helpers
{
public static UpstreamSegment Map(SegmentRequest request)
{
var upstreamSegment = new UpstreamSegment();
upstreamSegment.GlobalTraceIds.AddRange(request.UniqueIds.Select(MapToUniqueId).ToArray());
var traceSegment = new SegmentObject
{
TraceSegmentId = MapToUniqueId(request.Segment.SegmentId),
ServiceId = request.Segment.ServiceId,
ServiceInstanceId = request.Segment.ServiceInstanceId,
IsSizeLimited = false
};
traceSegment.Spans.Add(request.Segment.Spans.Select(MapToSpan).ToArray());
upstreamSegment.Segment = traceSegment.ToByteString();
return upstreamSegment;
}
private static UniqueId MapToUniqueId(UniqueIdRequest uniqueIdRequest)
{
var uniqueId = new UniqueId();
uniqueId.IdParts.Add(uniqueIdRequest.Part1);
uniqueId.IdParts.Add(uniqueIdRequest.Part2);
uniqueId.IdParts.Add(uniqueIdRequest.Part3);
return uniqueId;
}
private static SpanObjectV2 MapToSpan(SpanRequest request)
{
var spanObject = new SpanObjectV2
{
SpanId = request.SpanId,
ParentSpanId = request.ParentSpanId,
StartTime = request.StartTime,
EndTime = request.EndTime,
SpanType = (SpanType) request.SpanType,
SpanLayer = (SpanLayer) request.SpanLayer,
IsError = request.IsError
};
ReadStringOrIntValue(spanObject, request.Component, ComponentReader, ComponentIdReader);
ReadStringOrIntValue(spanObject, request.OperationName, OperationNameReader, OperationNameIdReader);
ReadStringOrIntValue(spanObject, request.Peer, PeerReader, PeerIdReader);
spanObject.Tags.Add(request.Tags.Select(x => new KeyStringValuePair {Key = x.Key, Value = x.Value}));
spanObject.Refs.AddRange(request.References.Select(MapToSegmentReference).ToArray());
spanObject.Logs.AddRange(request.Logs.Select(MapToLogMessage).ToArray());
return spanObject;
}
private static SegmentReference MapToSegmentReference(SegmentReferenceRequest referenceRequest)
{
var reference = new SegmentReference
{
ParentServiceInstanceId = referenceRequest.ParentServiceInstanceId,
EntryServiceInstanceId = referenceRequest.EntryServiceInstanceId,
ParentSpanId = referenceRequest.ParentSpanId,
RefType = (RefType) referenceRequest.RefType,
ParentTraceSegmentId = MapToUniqueId(referenceRequest.ParentSegmentId)
};
ReadStringOrIntValue(reference, referenceRequest.NetworkAddress, NetworkAddressReader,
NetworkAddressIdReader);
ReadStringOrIntValue(reference, referenceRequest.EntryEndpointName, EntryEndpointReader,
EntryEndpointIdReader);
ReadStringOrIntValue(reference, referenceRequest.ParentEndpointName, ParentEndpointReader,
ParentEndpointIdReader);
return reference;
}
private static Log MapToLogMessage(LogDataRequest request)
{
var logMessage = new Log {Time = request.Timestamp};
logMessage.Data.AddRange(request.Data.Select(x => new KeyStringValuePair {Key = x.Key, Value = x.Value})
.ToArray());
return logMessage;
}
private static void ReadStringOrIntValue<T>(T instance, StringOrIntValue stringOrIntValue,
Action<T, string> stringValueReader, Action<T, int> intValueReader)
{
if (stringOrIntValue.HasStringValue)
{
stringValueReader.Invoke(instance, stringOrIntValue.GetStringValue());
}
else if (stringOrIntValue.HasIntValue)
{
intValueReader.Invoke(instance, stringOrIntValue.GetIntValue());
}
}
private static readonly Action<SpanObjectV2, string> ComponentReader = (s, val) => s.Component = val;
private static readonly Action<SpanObjectV2, int> ComponentIdReader = (s, val) => s.ComponentId = val;
private static readonly Action<SpanObjectV2, string> OperationNameReader = (s, val) => s.OperationName = val;
private static readonly Action<SpanObjectV2, int> OperationNameIdReader = (s, val) => s.OperationNameId = val;
private static readonly Action<SpanObjectV2, string> PeerReader = (s, val) => s.Peer = val;
private static readonly Action<SpanObjectV2, int> PeerIdReader = (s, val) => s.PeerId = val;
private static readonly Action<SegmentReference, string> NetworkAddressReader =
(s, val) => s.NetworkAddress = val;
private static readonly Action<SegmentReference, int> NetworkAddressIdReader =
(s, val) => s.NetworkAddressId = val;
private static readonly Action<SegmentReference, string>
EntryEndpointReader = (s, val) => s.EntryEndpoint = val;
private static readonly Action<SegmentReference, int> EntryEndpointIdReader =
(s, val) => s.EntryEndpointId = val;
private static readonly Action<SegmentReference, string> ParentEndpointReader =
(s, val) => s.ParentEndpoint = val;
private static readonly Action<SegmentReference, int> ParentEndpointIdReader =
(s, val) => s.ParentEndpointId = val;
}
}
\ No newline at end of file
/*
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenSkywalking licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using SkyWalking.Config;
using SkyWalking.Logging;
using SkyWalking.NetworkProtocol;
namespace SkyWalking.Transport.Grpc.V5
{
internal class SegmentReporter : ISegmentReporter
{
private readonly ConnectionManager _connectionManager;
private readonly ILogger _logger;
private readonly GrpcConfig _config;
public SegmentReporter(ConnectionManager connectionManager, IConfigAccessor configAccessor,
ILoggerFactory loggerFactory)
{
_connectionManager = connectionManager;
_config = configAccessor.Get<GrpcConfig>();
_logger = loggerFactory.CreateLogger(typeof(SegmentReporter));
}
public async Task ReportAsync(IReadOnlyCollection<SegmentRequest> segmentRequests,
CancellationToken cancellationToken = default(CancellationToken))
{
if (!_connectionManager.Ready)
{
return;
}
var connection = _connectionManager.GetConnection();
try
{
var stopwatch = Stopwatch.StartNew();
var client = new TraceSegmentService.TraceSegmentServiceClient(connection);
using (var asyncClientStreamingCall =
client.collect(null, _config.GetReportTimeout(), cancellationToken))
{
foreach (var segment in segmentRequests)
await asyncClientStreamingCall.RequestStream.WriteAsync(SegmentV5Helpers.Map(segment));
await asyncClientStreamingCall.RequestStream.CompleteAsync();
await asyncClientStreamingCall.ResponseAsync;
}
stopwatch.Stop();
_logger.Information($"Report {segmentRequests.Count} trace segment. cost: {stopwatch.Elapsed}s");
}
catch (Exception ex)
{
_logger.Error("Report trace segment fail.", ex);
_connectionManager.Failure(ex);
}
}
}
}
\ No newline at end of file
/*
* Licensed to the OpenSkywalking under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenSkywalking licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using System.Threading;
using System.Threading.Tasks;
using SkyWalking.Config;
using SkyWalking.Logging;
using SkyWalking.NetworkProtocol;
namespace SkyWalking.Transport.Grpc.V5
{
public class SkyWalkingClientV5 : ISkyWalkingClientV5
{
private readonly ConnectionManager _connectionManager;
private readonly ILogger _logger;
private readonly GrpcConfig _config;
public SkyWalkingClientV5(ConnectionManager connectionManager, IConfigAccessor configAccessor,
ILoggerFactory loggerFactory)
{
_connectionManager = connectionManager;
_config = configAccessor.Get<GrpcConfig>();
_logger = loggerFactory.CreateLogger(typeof(SkyWalkingClientV5));
}
public async Task<NullableValue> RegisterApplicationAsync(string applicationCode,
CancellationToken cancellationToken = default(CancellationToken))
{
if (!_connectionManager.Ready)
{
return NullableValue.Null;
}
var connection = _connectionManager.GetConnection();
var client = new ApplicationRegisterService.ApplicationRegisterServiceClient(connection);
return await new Call(_logger, _connectionManager).Execute(async () =>
{
var applicationMapping = await client.applicationCodeRegisterAsync(
new Application {ApplicationCode = applicationCode},
null, _config.GetTimeout(), cancellationToken);
return new NullableValue(applicationMapping?.Application?.Value ?? 0);
},
() => NullableValue.Null,
() => ExceptionHelpers.RegisterApplicationError);
}
public async Task<NullableValue> RegisterApplicationInstanceAsync(int applicationId, Guid agentUUID,
long registerTime, AgentOsInfoRequest osInfoRequest,
CancellationToken cancellationToken = default(CancellationToken))
{
if (!_connectionManager.Ready)
{
return NullableValue.Null;
}
var connection = _connectionManager.GetConnection();
var client = new InstanceDiscoveryService.InstanceDiscoveryServiceClient(connection);
var applicationInstance = new ApplicationInstance
{
ApplicationId = applicationId,
AgentUUID = agentUUID.ToString("N"),
RegisterTime = registerTime,
Osinfo = new OSInfo
{
OsName = osInfoRequest.OsName,
Hostname = osInfoRequest.HostName,
ProcessNo = osInfoRequest.ProcessNo
}
};
applicationInstance.Osinfo.Ipv4S.AddRange(osInfoRequest.IpAddress);
return await new Call(_logger, _connectionManager).Execute(async () =>
{
var applicationInstanceMapping = await client.registerInstanceAsync(applicationInstance, null,
_config.GetTimeout(), cancellationToken);
return new NullableValue(applicationInstanceMapping?.ApplicationInstanceId ?? 0);
},
() => NullableValue.Null,
() => ExceptionHelpers.RegisterApplicationInstanceError);
}
public async Task HeartbeatAsync(int applicationInstance, long heartbeatTime,
CancellationToken cancellationToken = default(CancellationToken))
{
if (!_connectionManager.Ready)
{
return;
}
var connection = _connectionManager.GetConnection();
var client = new InstanceDiscoveryService.InstanceDiscoveryServiceClient(connection);
var heartbeat = new ApplicationInstanceHeartbeat
{
ApplicationInstanceId = applicationInstance,
HeartbeatTime = heartbeatTime
};
await new Call(_logger, _connectionManager).Execute(
async () => await client.heartbeatAsync(heartbeat, null, _config.GetTimeout(), cancellationToken),
() => ExceptionHelpers.HeartbeatError);
}
}
}
\ No newline at end of file
......@@ -19,7 +19,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using SkyWalking.Config;
......@@ -28,21 +27,21 @@ using SkyWalking.NetworkProtocol;
namespace SkyWalking.Transport.Grpc.V6
{
public class TraceReporter : ITraceReporter
internal class SegmentReporter : ISegmentReporter
{
private readonly ConnectionManager _connectionManager;
private readonly ILogger _logger;
private readonly GrpcConfig _config;
public TraceReporter(ConnectionManager connectionManager, IConfigAccessor configAccessor,
public SegmentReporter(ConnectionManager connectionManager, IConfigAccessor configAccessor,
ILoggerFactory loggerFactory)
{
_connectionManager = connectionManager;
_config = configAccessor.Get<GrpcConfig>();
_logger = loggerFactory.CreateLogger(typeof(TraceReporter));
_logger = loggerFactory.CreateLogger(typeof(SegmentReporter));
}
public async Task ReportAsync(IReadOnlyCollection<TraceSegmentRequest> segmentRequests,
public async Task ReportAsync(IReadOnlyCollection<SegmentRequest> segmentRequests,
CancellationToken cancellationToken = default(CancellationToken))
{
if (!_connectionManager.Ready)
......@@ -60,10 +59,11 @@ namespace SkyWalking.Transport.Grpc.V6
client.collect(null, _config.GetReportTimeout(), cancellationToken))
{
foreach (var segment in segmentRequests)
await asyncClientStreamingCall.RequestStream.WriteAsync(TraceSegmentHelpers.Map(segment));
await asyncClientStreamingCall.RequestStream.WriteAsync(SegmentV6Helpers.Map(segment));
await asyncClientStreamingCall.RequestStream.CompleteAsync();
await asyncClientStreamingCall.ResponseAsync;
}
stopwatch.Stop();
_logger.Information($"Report {segmentRequests.Count} trace segment. cost: {stopwatch.Elapsed}s");
}
......
......@@ -19,6 +19,7 @@
using System.Collections.Generic;
using System.IO;
using Microsoft.Extensions.Configuration;
using SkyWalking.Config;
namespace SkyWalking.Utilities.Configuration
{
......@@ -36,6 +37,7 @@ namespace SkyWalking.Utilities.Configuration
{"SkyWalking:Logging:Level", "Information"},
{"SkyWalking:Logging:FilePath", defaultLogFile},
{"SkyWalking:Transport:Interval", "3000"},
{"SkyWalking:Transport:ProtocolVersion", ProtocolVersions.V6},
{"SkyWalking:Transport:PendingSegmentLimit", "30000"},
{"SkyWalking:Transport:PendingSegmentTimeout", "1000"},
{"SkyWalking:Transport:gRPC:Servers", "localhost:11800"},
......
......@@ -30,7 +30,7 @@ namespace SkyWalking.Utilities.Logging
public class DefaultLoggerFactory : ILoggerFactory
{
private const string outputTemplate =
@"{Timestamp:yyyy-MM-dd HH:mm:ss.fff zzz} [{ApplicationCode}] [{Level}] {SourceContext} : {Message}{NewLine}{Exception}";
@"{Timestamp:yyyy-MM-dd HH:mm:ss.fff zzz} [{ServiceName}] [{Level}] {SourceContext} : {Message}{NewLine}{Exception}";
private readonly MSLoggerFactory _loggerFactory;
private readonly LoggingConfig _loggingConfig;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment