Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PushStream support to IApiRequest and support chunked-encoding #4989

Merged
merged 9 commits into from
Dec 28, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@
</assembly>
<assembly fullname="System.Memory">
<type fullname="System.Buffers.Binary.BinaryPrimitives" />
<type fullname="System.Buffers.StandardFormat" />
<type fullname="System.Buffers.Text.Utf8Formatter" />
<type fullname="System.MemoryExtensions" />
<type fullname="System.Runtime.InteropServices.MemoryMarshal" />
</assembly>
Expand All @@ -329,6 +331,7 @@
<type fullname="System.Net.Http.Headers.HttpRequestHeaders" />
<type fullname="System.Net.Http.Headers.HttpResponseHeaders" />
<type fullname="System.Net.Http.Headers.MediaTypeHeaderValue" />
<type fullname="System.Net.Http.Headers.NameValueHeaderValue" />
<type fullname="System.Net.Http.HttpClient" />
<type fullname="System.Net.Http.HttpClientHandler" />
<type fullname="System.Net.Http.HttpContent" />
Expand Down Expand Up @@ -359,6 +362,7 @@
<type fullname="System.Net.Sockets.AddressFamily" />
<type fullname="System.Net.Sockets.SocketError" />
<type fullname="System.Net.Sockets.SocketException" />
<type fullname="System.Net.TransportContext" />
</assembly>
<assembly fullname="System.Net.Requests">
<type fullname="System.Net.HttpWebRequest" />
Expand Down
3 changes: 3 additions & 0 deletions tracer/src/Datadog.Trace/Agent/IApiRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// </copyright>

using System;
using System.IO;
using System.Threading.Tasks;

namespace Datadog.Trace.Agent
Expand All @@ -17,5 +18,7 @@ internal interface IApiRequest
Task<IApiResponse> PostAsync(ArraySegment<byte> bytes, string contentType);

Task<IApiResponse> PostAsync(ArraySegment<byte> bytes, string contentType, string contentEncoding);

Task<IApiResponse> PostAsync(Func<Stream, Task> writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary);
}
}
62 changes: 30 additions & 32 deletions tracer/src/Datadog.Trace/Agent/Transports/ApiWebRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// </copyright>

using System;
using System.IO;
using System.Net;
using System.Text;
using System.Threading.Tasks;
Expand Down Expand Up @@ -34,21 +35,11 @@ public void AddHeader(string name, string value)
_request.Headers.Add(name, value);
}

public async Task<IApiResponse> GetAsync()
public Task<IApiResponse> GetAsync()
{
ResetRequest(method: "GET", contentType: null, contentEncoding: null);

try
{
var httpWebResponse = (HttpWebResponse)await _request.GetResponseAsync().ConfigureAwait(false);
return new ApiWebResponse(httpWebResponse);
}
catch (WebException exception)
when (exception.Status == WebExceptionStatus.ProtocolError && exception.Response != null)
{
// If the exception is caused by an error status code, swallow the exception and let the caller handle the result
return new ApiWebResponse((HttpWebResponse)exception.Response);
}
return FinishAndGetResponse();
}

public Task<IApiResponse> PostAsync(ArraySegment<byte> bytes, string contentType)
Expand All @@ -63,17 +54,19 @@ public async Task<IApiResponse> PostAsync(ArraySegment<byte> bytes, string conte
await requestStream.WriteAsync(bytes.Array, bytes.Offset, bytes.Count).ConfigureAwait(false);
}

try
{
var httpWebResponse = (HttpWebResponse)await _request.GetResponseAsync().ConfigureAwait(false);
return new ApiWebResponse(httpWebResponse);
}
catch (WebException exception)
when (exception.Status == WebExceptionStatus.ProtocolError && exception.Response != null)
return await FinishAndGetResponse().ConfigureAwait(false);
}

public async Task<IApiResponse> PostAsync(Func<Stream, Task> writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary)
{
ResetRequest(method: "POST", ContentTypeHelper.GetContentType(contentType, multipartBoundary), contentEncoding);

using (var requestStream = await _request.GetRequestStreamAsync().ConfigureAwait(false))
{
// If the exception is caused by an error status code, ignore it and let the caller handle the result
return new ApiWebResponse((HttpWebResponse)exception.Response);
await writeToRequestStream(requestStream).ConfigureAwait(false);
}

return await FinishAndGetResponse().ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -167,17 +160,7 @@ public async Task<IApiResponse> PostAsync(params MultipartFormItem[] items)
}
}

try
{
var httpWebResponse = (HttpWebResponse)await _request.GetResponseAsync().ConfigureAwait(false);
return new ApiWebResponse(httpWebResponse);
}
catch (WebException exception)
when (exception.Status == WebExceptionStatus.ProtocolError && exception.Response != null)
{
// If the exception is caused by an error status code, ignore it and let the caller handle the result
return new ApiWebResponse((HttpWebResponse)exception.Response);
}
return await FinishAndGetResponse().ConfigureAwait(false);
}

private void ResetRequest(string method, string contentType, string contentEncoding)
Expand All @@ -193,5 +176,20 @@ private void ResetRequest(string method, string contentType, string contentEncod
_request.Headers.Set(HttpRequestHeader.ContentEncoding, contentEncoding);
}
}

private async Task<IApiResponse> FinishAndGetResponse()
{
try
{
var httpWebResponse = (HttpWebResponse)await _request.GetResponseAsync().ConfigureAwait(false);
return new ApiWebResponse(httpWebResponse);
}
catch (WebException exception)
when (exception.Status == WebExceptionStatus.ProtocolError && exception.Response != null)
{
// If the exception is caused by an error status code, ignore it and let the caller handle the result
return new ApiWebResponse((HttpWebResponse)exception.Response);
}
}
}
}
16 changes: 16 additions & 0 deletions tracer/src/Datadog.Trace/Agent/Transports/ContentTypeHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// <copyright file="ContentTypeHelper.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

namespace Datadog.Trace.Agent.Transports;

internal static class ContentTypeHelper
{
public static string GetContentType(string contentType, string? multipartBoundary)
=> string.IsNullOrEmpty(multipartBoundary)
? contentType
: $"{contentType}; boundary={multipartBoundary}";
}
24 changes: 24 additions & 0 deletions tracer/src/Datadog.Trace/Agent/Transports/HttpClientRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,30 @@ public async Task<IApiResponse> PostAsync(ArraySegment<byte> bytes, string conte
}
}

public async Task<IApiResponse> PostAsync(Func<Stream, Task> writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary)
{
// re-create HttpContent on every retry because some versions of HttpClient always dispose of it, so we can't reuse.
using var content = new PushStreamContent(writeToRequestStream);

var contentTypeHeader = new MediaTypeHeaderValue(contentType);
if (!string.IsNullOrEmpty(multipartBoundary))
{
contentTypeHeader.Parameters.Add(new NameValueHeaderValue("boundary", multipartBoundary));
}

content.Headers.ContentType = contentTypeHeader;

if (!string.IsNullOrEmpty(contentEncoding))
{
content.Headers.ContentEncoding.Add(contentEncoding);
}

_postRequest.Content = content;
var response = await _client.SendAsync(_postRequest).ConfigureAwait(false);

return new HttpClientResponse(response);
}

public async Task<IApiResponse> PostAsync(params MultipartFormItem[] items)
{
if (items is null)
Expand Down
16 changes: 12 additions & 4 deletions tracer/src/Datadog.Trace/Agent/Transports/HttpStreamRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,36 @@ public void AddHeader(string name, string value)
}

public async Task<IApiResponse> GetAsync()
=> (await SendAsync(WebRequestMethods.Http.Get, null, null, null).ConfigureAwait(false)).Item1;
=> (await SendAsync(WebRequestMethods.Http.Get, null, null, null, chunkedEncoding: false).ConfigureAwait(false)).Item1;

public Task<IApiResponse> PostAsync(ArraySegment<byte> bytes, string contentType)
=> PostAsync(bytes, contentType, contentEncoding: null);

public async Task<IApiResponse> PostAsync(ArraySegment<byte> bytes, string contentType, string contentEncoding)
=> (await SendAsync(WebRequestMethods.Http.Post, contentType, new BufferContent(bytes), contentEncoding).ConfigureAwait(false)).Item1;
=> (await SendAsync(WebRequestMethods.Http.Post, contentType, new BufferContent(bytes), contentEncoding, chunkedEncoding: false).ConfigureAwait(false)).Item1;

private async Task<Tuple<IApiResponse, HttpRequest>> SendAsync(string verb, string contentType, IHttpContent content, string contentEncoding)
public async Task<IApiResponse> PostAsync(Func<Stream, Task> writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary)
=> (await SendAsync(WebRequestMethods.Http.Post, contentType, new HttpOverStreams.HttpContent.PushStreamContent(writeToRequestStream), contentEncoding, chunkedEncoding: true, multipartBoundary).ConfigureAwait(false)).Item1;

private async Task<Tuple<IApiResponse, HttpRequest>> SendAsync(string verb, string contentType, IHttpContent content, string contentEncoding, bool chunkedEncoding, string multipartBoundary = null)
{
using (var bidirectionalStream = _streamFactory.GetBidirectionalStream())
{
if (contentType != null)
{
_headers.Add("Content-Type", contentType);
_headers.Add("Content-Type", ContentTypeHelper.GetContentType(contentType, multipartBoundary));
}

if (!string.IsNullOrEmpty(contentEncoding))
{
_headers.Add("Content-Encoding", contentEncoding);
}

if (chunkedEncoding)
{
_headers.Add("Transfer-Encoding", "chunked");
}

var request = new HttpRequest(verb, _uri.Host, _uri.PathAndQuery, _headers, content);
// send request, get response
var response = await _client.SendAsync(request, bidirectionalStream, bidirectionalStream).ConfigureAwait(false);
Expand Down
75 changes: 75 additions & 0 deletions tracer/src/Datadog.Trace/Agent/Transports/PushStreamContent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// <copyright file="PushStreamContent.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

// Based on code from https://github.com/aspnet/AspNetWebStack/blob/1231b77d79956152831b75ad7f094f844251b97f/src/System.Net.Http.Formatting/PushStreamContent.cs
// and https://github.com/aspnet/AspNetWebStack/blob/1231b77d79956152831b75ad7f094f844251b97f/src/System.Net.Http.Formatting/Internal/DelegatingStream.cs
// which is licensed as:
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

#nullable enable

#if NETCOREAPP

using System;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;

namespace Datadog.Trace.Agent.Transports;

/// <summary>
/// Provides an <see cref="HttpContent"/> implementation that exposes an output <see cref="Stream"/>
/// which can be written to directly. The ability to push data to the output stream differs from the
/// <see cref="StreamContent"/> where data is pulled and not pushed.
/// </summary>
internal class PushStreamContent : HttpContent
{
private readonly Func<Stream, Task> _onStreamAvailable;

/// <summary>
/// Initializes a new instance of the <see cref="PushStreamContent"/> class with the given <see cref="MediaTypeHeaderValue"/>.
/// </summary>
/// <param name="onStreamAvailable">The action to call when an output stream is available. When the
/// output stream is closed or disposed, it will signal to the content that it has completed and the
/// HTTP request or response will be completed.</param>
public PushStreamContent(Func<Stream, Task> onStreamAvailable)
{
_onStreamAvailable = onStreamAvailable;
}

/// <summary>
/// When this method is called, it calls the action provided in the constructor with the output
/// stream to write to. The action must not close or dispose the stream. Once the task completes,
/// it will close this content instance and complete the HTTP request or response.
/// </summary>
/// <param name="stream">The <see cref="Stream"/> to which to write.</param>
/// <param name="context">The associated <see cref="TransportContext"/>.</param>
/// <returns>A <see cref="Task"/> instance that is asynchronously serializing the object's content.</returns>
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "Exception is passed as task result.")]
protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context)
{
// Note the callee must not close or dispose the stream because they don't own it
// We _could_ use a wrapper stream to enforce that, but then it _requires_ the
// callee to call close/dispose which seems weird
return _onStreamAvailable(stream);
}

/// <summary>
/// Computes the length of the stream if possible.
/// </summary>
/// <param name="length">The computed length of the stream.</param>
/// <returns><c>true</c> if the length has been computed; otherwise <c>false</c>.</returns>
protected override bool TryComputeLength(out long length)
{
// We can't know the length of the content being pushed to the output stream.
length = -1;
return false;
}
}
#endif
1 change: 1 addition & 0 deletions tracer/src/Datadog.Trace/Datadog.Trace.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from a source generator as opposed to something that is coming from some other tool. -->
<Compile Remove="$(GeneratedFolder)/*/**/*.cs" />
<Compile Update="Configuration\ConfigurationKeys.*.cs" DependentUpon="ConfigurationKeys.cs" />
<Compile Remove="Util\Streams\ChunkedEncodingWriteStream.cs" />
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this being removed? Also I don't see this file on that path is it supposed to be the HttpOverStreams\ChunkedEncodingWriteStream.cs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, looks like it got left over in various rebasing. I'll remove it in a subsequent PR (to save CI). Thanks!

</ItemGroup>

<ItemGroup>
Expand Down
Loading
Loading