Skip to content

Commit

Permalink
Merge branch 'user/kcoulombe/HandleMeterDisposal' of https://github.c…
Browse files Browse the repository at this point in the history
…om/stonkie/opentelemetry-dotnet into user/kcoulombe/HandleMeterDisposal
  • Loading branch information
stonkie committed Nov 29, 2024
2 parents f4fa00b + d12808d commit cc18547
Show file tree
Hide file tree
Showing 17 changed files with 301 additions and 1,026 deletions.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#if NETFRAMEWORK
using System.Net.Http;
#endif
using System.Net.Http.Headers;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

internal abstract class ProtobufOtlpExportClient : IProtobufExportClient
{
private static readonly Version Http2RequestVersion = new(2, 0);

#if NET
private static readonly bool SynchronousSendSupportedByCurrentPlatform;

static ProtobufOtlpExportClient()
{
#if NET
// See: https://github.com/dotnet/runtime/blob/280f2a0c60ce0378b8db49adc0eecc463d00fe5d/src/libraries/System.Net.Http/src/System/Net/Http/HttpClientHandler.AnyMobile.cs#L767
SynchronousSendSupportedByCurrentPlatform = !OperatingSystem.IsAndroid()
&& !OperatingSystem.IsIOS()
&& !OperatingSystem.IsTvOS()
&& !OperatingSystem.IsBrowser();
#endif
}
#endif

protected ProtobufOtlpExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)
{
Guard.ThrowIfNull(options);
Guard.ThrowIfNull(httpClient);
Guard.ThrowIfNull(signalPath);

Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath);
this.Endpoint = new UriBuilder(exporterEndpoint).Uri;
this.Headers = options.GetHeaders<Dictionary<string, string>>((d, k, v) => d.Add(k, v));
this.HttpClient = httpClient;
}

internal HttpClient HttpClient { get; }

internal Uri Endpoint { get; }

internal IReadOnlyDictionary<string, string> Headers { get; }

internal abstract MediaTypeHeaderValue MediaTypeHeader { get; }

internal virtual bool RequireHttp2 => false;

public abstract ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default);

/// <inheritdoc/>
public bool Shutdown(int timeoutMilliseconds)
{
this.HttpClient.CancelPendingRequests();
return true;
}

protected HttpRequestMessage CreateHttpRequest(byte[] buffer, int contentLength)
{
var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint);

if (this.RequireHttp2)
{
request.Version = Http2RequestVersion;

#if NET6_0_OR_GREATER
request.VersionPolicy = HttpVersionPolicy.RequestVersionExact;
#endif
}

foreach (var header in this.Headers)
{
request.Headers.Add(header.Key, header.Value);
}

// TODO: Support compression.

request.Content = new ByteArrayContent(buffer, 0, contentLength);
request.Content.Headers.ContentType = this.MediaTypeHeader;

return request;
}

protected HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken)
{
#if NET
// Note: SendAsync must be used with HTTP/2 because synchronous send is
// not supported.
return this.RequireHttp2 || !SynchronousSendSupportedByCurrentPlatform
? this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult()
: this.HttpClient.Send(request, cancellationToken);
#else
return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
#endif
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@
#endif
using System.Net.Http.Headers;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient.Grpc;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

/// <summary>Base class for sending OTLP export request over gRPC.</summary>
internal sealed class ProtobufOtlpGrpcExportClient : IProtobufExportClient
internal sealed class ProtobufOtlpGrpcExportClient : ProtobufOtlpExportClient
{
public const string GrpcStatusDetailsHeader = "grpc-status-details-bin";
private static readonly ExportClientHttpResponse SuccessExportResponse = new(success: true, deadlineUtc: default, response: null, exception: null);
private static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/grpc");
private static readonly Version Http2RequestVersion = new(2, 0);

private static readonly ExportClientGrpcResponse DefaultExceptionExportClientGrpcResponse
= new(
Expand All @@ -27,49 +25,34 @@ private static readonly ExportClientGrpcResponse DefaultExceptionExportClientGrp
grpcStatusDetailsHeader: null);

public ProtobufOtlpGrpcExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)
: base(options, httpClient, signalPath)
{
Guard.ThrowIfNull(options);
Guard.ThrowIfNull(httpClient);
Guard.ThrowIfNull(signalPath);
Guard.ThrowIfInvalidTimeout(options.TimeoutMilliseconds);

Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath);
this.Endpoint = new UriBuilder(exporterEndpoint).Uri;
this.Headers = options.GetHeaders<Dictionary<string, string>>((d, k, v) => d.Add(k, v));
this.HttpClient = httpClient;
}

internal HttpClient HttpClient { get; }
internal override MediaTypeHeaderValue MediaTypeHeader => MediaHeaderValue;

internal Uri Endpoint { get; set; }

internal IReadOnlyDictionary<string, string> Headers { get; }

internal int TimeoutMilliseconds { get; }
internal override bool RequireHttp2 => true;

/// <inheritdoc/>
public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
public override ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
{
try
{
using var httpRequest = this.CreateHttpRequest(buffer, contentLength);
using var httpResponse = this.SendHttpRequest(httpRequest, cancellationToken);

try
{
httpResponse.EnsureSuccessStatusCode();
}
catch (HttpRequestException)
{
throw;
}
httpResponse.EnsureSuccessStatusCode();

var trailingHeaders = httpResponse.TrailingHeaders();
Status status = GrpcProtocolHelpers.GetResponseStatus(httpResponse, trailingHeaders);

if (status.Detail.Equals(Status.NoReplyDetailMessage))
{
#if NET
using var responseStream = httpResponse.Content.ReadAsStream(cancellationToken);
#else
using var responseStream = httpResponse.Content.ReadAsStreamAsync().GetAwaiter().GetResult();
#endif
int firstByte = responseStream.ReadByte();

if (firstByte == -1)
Expand Down Expand Up @@ -170,45 +153,11 @@ public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength,
}
}

public HttpRequestMessage CreateHttpRequest(byte[] buffer, int contentLength)
{
var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint);
request.Version = Http2RequestVersion;

#if NET6_0_OR_GREATER
request.VersionPolicy = HttpVersionPolicy.RequestVersionExact;
#endif

foreach (var header in this.Headers)
{
request.Headers.Add(header.Key, header.Value);
}

// TODO: Support compression.

request.Content = new ByteArrayContent(buffer, 0, contentLength);
request.Content.Headers.ContentType = MediaHeaderValue;

return request;
}

public HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken)
{
return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult();
}

/// <inheritdoc/>
public bool Shutdown(int timeoutMilliseconds)
{
this.HttpClient.CancelPendingRequests();
return true;
}

private static bool IsTransientNetworkError(HttpRequestException ex)
{
return ex.InnerException is System.Net.Sockets.SocketException socketEx &&
(socketEx.SocketErrorCode == System.Net.Sockets.SocketError.TimedOut ||
socketEx.SocketErrorCode == System.Net.Sockets.SocketError.ConnectionReset ||
socketEx.SocketErrorCode == System.Net.Sockets.SocketError.HostUnreachable);
return ex.InnerException is System.Net.Sockets.SocketException socketEx
&& (socketEx.SocketErrorCode == System.Net.Sockets.SocketError.TimedOut
|| socketEx.SocketErrorCode == System.Net.Sockets.SocketError.ConnectionReset
|| socketEx.SocketErrorCode == System.Net.Sockets.SocketError.HostUnreachable);
}
}
Loading

0 comments on commit cc18547

Please sign in to comment.