Skip to content

Commit

Permalink
[otlp] Fix TODOs, Refactor Buffer Size Handling, and Cleanup Environm…
Browse files Browse the repository at this point in the history
…ent Variables (#6009)

Co-authored-by: Mikel Blanchard <mblanchard@macrosssoftware.com>
  • Loading branch information
rajkumar-rangaraj and CodeBlanch authored Dec 5, 2024
1 parent 0c775e5 commit 305597d
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ internal static IOpenTelemetryBuilder UseOtlpExporter(
/// <para><see cref="IConfiguration"/> to bind onto <see cref="OtlpExporterBuilderOptions"/>.</para>
/// <para>Notes:
/// <list type="bullet">
/// <item docLink="true">See [TODO:Add doc link] for details on the configuration
/// schema.</item>
/// <item docLink="true"><see href="https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md"/>
/// for details on the configuration schema.</item>
/// <item>The <see cref="OtlpExporterBuilderOptions"/> instance will be
/// named "otlp" by default when calling this method.</item>
/// </list>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ internal sealed class ExperimentalOptions

public const string OtlpDiskRetryDirectoryPathEnvVar = "OTEL_DOTNET_EXPERIMENTAL_OTLP_DISK_RETRY_DIRECTORY_PATH";

public const string OtlpUseCustomSerializer = "OTEL_DOTNET_EXPERIMENTAL_USE_CUSTOM_PROTOBUF_SERIALIZER";

public ExperimentalOptions()
: this(new ConfigurationBuilder().AddEnvironmentVariables().Build())
{
Expand All @@ -31,11 +29,6 @@ public ExperimentalOptions(IConfiguration configuration)
this.EmitLogEventAttributes = emitLogEventAttributes;
}

if (configuration.TryGetBoolValue(OpenTelemetryProtocolExporterEventSource.Log, OtlpUseCustomSerializer, out var useCustomSerializer))
{
this.UseCustomProtobufSerializer = useCustomSerializer;
}

if (configuration.TryGetStringValue(OtlpRetryEnvVar, out var retryPolicy) && retryPolicy != null)
{
if (retryPolicy.Equals("in_memory", StringComparison.OrdinalIgnoreCase))
Expand Down Expand Up @@ -85,9 +78,4 @@ public ExperimentalOptions(IConfiguration configuration)
/// Gets the path on disk where the telemetry will be stored for retries at a later point.
/// </summary>
public string? DiskRetryDirectoryPath { get; }

/// <summary>
/// Gets a value indicating whether custom serializer should be used for OTLP export.
/// </summary>
public bool UseCustomProtobufSerializer { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ internal static class GrpcStatusDeserializer
TimeSpan.FromTicks(retryInfo.Value.RetryDelay.Value.Nanos / 100); // Convert nanos to ticks
}
}
catch
catch (Exception ex)
{
// TODO: Log exception to event source.
OpenTelemetryProtocolExporterEventSource.Log.GrpcRetryDelayParsingFailed(grpcStatusDetailsHeader, ex);
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ public void RequestTimedOut(Uri endpoint, Exception ex)
}
}

[NonEvent]
public void GrpcRetryDelayParsingFailed(string? grpcStatusDetailsHeader, Exception ex)
{
if (Log.IsEnabled(EventLevel.Warning, EventKeywords.All))
{
this.GrpcRetryDelayParsingFailed(grpcStatusDetailsHeader ?? "null", ex.ToInvariantString());
}
}

[Event(2, Message = "Exporter failed send data to collector to {0} endpoint. Data will not be sent. Exception: {1}", Level = EventLevel.Error)]
public void FailedToReachCollector(string rawCollectorUri, string ex)
{
Expand Down Expand Up @@ -205,6 +214,12 @@ public void ExportFailure(string endpoint, string message)
this.WriteEvent(23, endpoint, message);
}

[Event(24, Message = "Failed to parse gRPC retry delay from header grpcStatusDetailsHeader: '{0}'. Exception: {1}", Level = EventLevel.Warning)]
public void GrpcRetryDelayParsingFailed(string grpcStatusDetailsHeader, string exception)
{
this.WriteEvent(24, grpcStatusDetailsHeader, exception);
}

void IConfigurationExtensionsLogger.LogInvalidConfigurationValue(string key, string value)
{
this.InvalidConfigurationValue(key, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,32 @@ internal static int WriteLogsData(byte[] buffer, int writePosition, SdkLimitOpti
logRecords.Add(logRecord);
}

writePosition = WriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, ScopeLogsList);
writePosition = TryWriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, ScopeLogsList);
ProtobufSerializer.WriteReservedLength(buffer, logsDataLengthPosition, writePosition - (logsDataLengthPosition + ReserveSizeForLength));
ReturnLogRecordListToPool();

return writePosition;
}

internal static int TryWriteResourceLogs(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions, Resources.Resource? resource, Dictionary<string, List<LogRecord>> scopeLogs)
{
try
{
writePosition = WriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, scopeLogs);
}
catch (IndexOutOfRangeException)
{
if (!ProtobufSerializer.IncreaseBufferSize(ref buffer, OtlpSignalType.Logs))
{
throw;
}

return TryWriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, scopeLogs);
}

return writePosition;
}

internal static void ReturnLogRecordListToPool()
{
if (ScopeLogsList.Count != 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,32 @@ internal static int WriteMetricsData(byte[] buffer, int writePosition, Resources
metrics.Add(metric);
}

writePosition = WriteResourceMetrics(buffer, writePosition, resource, ScopeMetricsList);
writePosition = TryWriteResourceMetrics(buffer, writePosition, resource, ScopeMetricsList);
ProtobufSerializer.WriteReservedLength(buffer, mericsDataLengthPosition, writePosition - (mericsDataLengthPosition + ReserveSizeForLength));
ReturnMetricListToPool();

return writePosition;
}

internal static int TryWriteResourceMetrics(byte[] buffer, int writePosition, Resources.Resource? resource, Dictionary<string, List<Metric>> scopeMetrics)
{
try
{
writePosition = WriteResourceMetrics(buffer, writePosition, resource, scopeMetrics);
}
catch (IndexOutOfRangeException)
{
if (!ProtobufSerializer.IncreaseBufferSize(ref buffer, OtlpSignalType.Metrics))
{
throw;
}

return TryWriteResourceMetrics(buffer, writePosition, resource, scopeMetrics);
}

return writePosition;
}

private static void ReturnMetricListToPool()
{
if (ScopeMetricsList.Count != 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ internal static int TryWriteResourceSpans(byte[] buffer, int writePosition, SdkL
// and avoids stack overflow.
return TryWriteResourceSpans(buffer, writePosition, sdkLimitOptions, resource);
}
catch
{
throw;
}

return writePosition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ public static THeaders GetHeaders<THeaders>(this OtlpExporterOptions options, Ac
return headers;
}

public static OtlpExporterTransmissionHandler GetProtobufExportTransmissionHandler(this OtlpExporterOptions options, ExperimentalOptions experimentalOptions, OtlpSignalType otlpSignalType)
public static OtlpExporterTransmissionHandler GetExportTransmissionHandler(this OtlpExporterOptions options, ExperimentalOptions experimentalOptions, OtlpSignalType otlpSignalType)
{
var exportClient = GetProtobufExportClient(options, otlpSignalType);
var exportClient = GetExportClient(options, otlpSignalType);

// `HttpClient.Timeout.TotalMilliseconds` would be populated with the correct timeout value for both the exporter configuration cases:
// 1. User provides their own HttpClient. This case is straightforward as the user wants to use their `HttpClient` and thereby the same client's timeout value.
Expand Down Expand Up @@ -88,7 +88,7 @@ public static OtlpExporterTransmissionHandler GetProtobufExportTransmissionHandl
}
}

public static IExportClient GetProtobufExportClient(this OtlpExporterOptions options, OtlpSignalType otlpSignalType)
public static IExportClient GetExportClient(this OtlpExporterOptions options, OtlpSignalType otlpSignalType)
{
var httpClient = options.HttpClientFactory?.Invoke() ?? throw new InvalidOperationException("OtlpExporterOptions was missing HttpClientFactory or it returned null.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace OpenTelemetry.Exporter;
/// </summary>
public sealed class OtlpLogExporter : BaseExporter<LogRecord>
{
private const int GrpcStartWritePosition = 5;
private readonly SdkLimitOptions sdkLimitOptions;
private readonly ExperimentalOptions experimentalOptions;
private readonly OtlpExporterTransmissionHandler transmissionHandler;
Expand Down Expand Up @@ -57,8 +58,8 @@ internal OtlpLogExporter(

this.experimentalOptions = experimentalOptions!;
this.sdkLimitOptions = sdkLimitOptions!;
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!, OtlpSignalType.Logs);
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? GrpcStartWritePosition : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetExportTransmissionHandler(experimentalOptions!, OtlpSignalType.Logs);
}

internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();
Expand All @@ -73,14 +74,14 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
{
int writePosition = ProtobufOtlpLogSerializer.WriteLogsData(this.buffer, this.startWritePosition, this.sdkLimitOptions, this.experimentalOptions, this.Resource, logRecordBatch);

if (this.startWritePosition == 5)
if (this.startWritePosition == GrpcStartWritePosition)
{
// Grpc payload consists of 3 parts
// byte 0 - Specifying if the payload is compressed.
// 1-4 byte - Specifies the length of payload in big endian format.
// 5 and above - Protobuf serialized data.
Span<byte> data = new Span<byte>(this.buffer, 1, 4);
var dataLength = writePosition - 5;
var dataLength = writePosition - GrpcStartWritePosition;
BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength);
}

Expand All @@ -89,13 +90,6 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
return ExportResult.Failure;
}
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
{
throw;
}
}
catch (Exception ex)
{
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex);
Expand All @@ -107,21 +101,4 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)

/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds) => this.transmissionHandler?.Shutdown(timeoutMilliseconds) ?? true;

// TODO: Consider moving this to a shared utility class.
private bool IncreaseBufferSize()
{
var newBufferSize = this.buffer.Length * 2;

if (newBufferSize > 100 * 1024 * 1024)
{
return false;
}

var newBuffer = new byte[newBufferSize];
this.buffer.CopyTo(newBuffer, 0);
this.buffer = newBuffer;

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace OpenTelemetry.Exporter;
/// </summary>
public class OtlpMetricExporter : BaseExporter<Metric>
{
private const int GrpcStartWritePosition = 5;
private readonly OtlpExporterTransmissionHandler transmissionHandler;
private readonly int startWritePosition;

Expand Down Expand Up @@ -50,8 +51,8 @@ internal OtlpMetricExporter(
Debug.Assert(exporterOptions != null, "exporterOptions was null");
Debug.Assert(experimentalOptions != null, "experimentalOptions was null");

this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!, OtlpSignalType.Metrics);
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? GrpcStartWritePosition : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetExportTransmissionHandler(experimentalOptions!, OtlpSignalType.Metrics);
}

internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();
Expand All @@ -66,14 +67,14 @@ public override ExportResult Export(in Batch<Metric> metrics)
{
int writePosition = ProtobufOtlpMetricSerializer.WriteMetricsData(this.buffer, this.startWritePosition, this.Resource, metrics);

if (this.startWritePosition == 5)
if (this.startWritePosition == GrpcStartWritePosition)
{
// Grpc payload consists of 3 parts
// byte 0 - Specifying if the payload is compressed.
// 1-4 byte - Specifies the length of payload in big endian format.
// 5 and above - Protobuf serialized data.
Span<byte> data = new Span<byte>(this.buffer, 1, 4);
var dataLength = writePosition - 5;
var dataLength = writePosition - GrpcStartWritePosition;
BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength);
}

Expand All @@ -82,13 +83,6 @@ public override ExportResult Export(in Batch<Metric> metrics)
return ExportResult.Failure;
}
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
{
throw;
}
}
catch (Exception ex)
{
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex);
Expand All @@ -100,21 +94,4 @@ public override ExportResult Export(in Batch<Metric> metrics)

/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds) => this.transmissionHandler.Shutdown(timeoutMilliseconds);

// TODO: Consider moving this to a shared utility class.
private bool IncreaseBufferSize()
{
var newBufferSize = this.buffer.Length * 2;

if (newBufferSize > 100 * 1024 * 1024)
{
return false;
}

var newBuffer = new byte[newBufferSize];
this.buffer.CopyTo(newBuffer, 0);
this.buffer = newBuffer;

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace OpenTelemetry.Exporter;
/// </summary>
public class OtlpTraceExporter : BaseExporter<Activity>
{
private const int GrpcStartWritePosition = 5;
private readonly SdkLimitOptions sdkLimitOptions;
private readonly OtlpExporterTransmissionHandler transmissionHandler;
private readonly int startWritePosition;
Expand Down Expand Up @@ -53,8 +54,8 @@ internal OtlpTraceExporter(
Debug.Assert(sdkLimitOptions != null, "sdkLimitOptions was null");

this.sdkLimitOptions = sdkLimitOptions!;
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions, OtlpSignalType.Traces);
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? GrpcStartWritePosition : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetExportTransmissionHandler(experimentalOptions, OtlpSignalType.Traces);
}

internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();
Expand All @@ -69,14 +70,14 @@ public override ExportResult Export(in Batch<Activity> activityBatch)
{
int writePosition = ProtobufOtlpTraceSerializer.WriteTraceData(this.buffer, this.startWritePosition, this.sdkLimitOptions, this.Resource, activityBatch);

if (this.startWritePosition == 5)
if (this.startWritePosition == GrpcStartWritePosition)
{
// Grpc payload consists of 3 parts
// byte 0 - Specifying if the payload is compressed.
// 1-4 byte - Specifies the length of payload in big endian format.
// 5 and above - Protobuf serialized data.
Span<byte> data = new Span<byte>(this.buffer, 1, 4);
var dataLength = writePosition - 5;
var dataLength = writePosition - GrpcStartWritePosition;
BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength);
}

Expand Down
Loading

0 comments on commit 305597d

Please sign in to comment.