Skip to content

Commit

Permalink
[QUIC] QuicStream reading/writing work (#90253)
Browse files Browse the repository at this point in the history
* Added asserts to send buffer helper

* Postpone confirming the last RECEIVE event until the data are read

* Removed lock

* Debug tests

* ReadsClosed test fixed, and some better logging

* Final task keep alive, abort order, timeout for graceful write-side shutdown in dispose, named constants

* Tests

* Always wait for SEND_COMPLETE

* Exclude BigPayload on platforms where it can OOM

* Removed unintended code changes

* Reverted postponing reading FIN, if data have chance to get buffered with FIN, we will do that.

* Clean ups

* Fixed waiting for SEND_COMPLETE

* Hold back setting FinalTaskSource and overwrite result if no waiter is there

* Cancellation and completion

* Comments, fixed FinalTaskSource

* Fix assert

* Test reseting control stream made more resilient

* Attempt to fix still running write while disposing the stream in case of a cancellation

* Attempt to fix stress build

* Sync Dispose in H3Stream waits for read and write as well
  • Loading branch information
ManickaP authored Sep 13, 2023
1 parent 7eb6c0c commit 2984b87
Show file tree
Hide file tree
Showing 18 changed files with 680 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ internal sealed class Http3Connection : HttpConnectionBase

// Our control stream.
private QuicStream? _clientControl;
private Task _sendSettingsTask;

// Server-advertised SETTINGS_MAX_FIELD_SECTION_SIZE
// https://www.rfc-editor.org/rfc/rfc9114.html#section-7.2.4.1-2.2.1
Expand Down Expand Up @@ -88,7 +89,7 @@ public Http3Connection(HttpConnectionPool pool, HttpAuthority authority, QuicCon
}

// Errors are observed via Abort().
_ = SendSettingsAsync();
_sendSettingsTask = SendSettingsAsync();

// This process is cleaned up when _connection is disposed, and errors are observed via Abort().
_ = AcceptStreamsAsync();
Expand Down Expand Up @@ -150,6 +151,7 @@ private void CheckForShutdown()

if (_clientControl != null)
{
await _sendSettingsTask.ConfigureAwait(false);
await _clientControl.DisposeAsync().ConfigureAwait(false);
_clientControl = null;
}
Expand Down Expand Up @@ -486,7 +488,7 @@ private async Task ProcessServerStreamAsync(QuicStream stream)

if (bytesRead == 0)
{
// https://quicwg.org/base-drafts/draft-ietf-quic-http.html#name-unidirectional-streams
// https://www.rfc-editor.org/rfc/rfc9114.html#name-unidirectional-streams
// A sender can close or reset a unidirectional stream unless otherwise specified. A receiver MUST
// tolerate unidirectional streams being closed or reset prior to the reception of the unidirectional
// stream header.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ internal sealed class Http3RequestStream : IHttpStreamHeadersHandler, IAsyncDisp
private TaskCompletionSource<bool>? _expect100ContinueCompletionSource; // True indicates we should send content (e.g. received 100 Continue).
private bool _disposed;
private readonly CancellationTokenSource _requestBodyCancellationSource;
private Task? _sendRequestTask; // Set with SendContentAsync, must be awaited before QuicStream.DisposeAsync();
private Task? _readResponseTask; // Set with ReadResponseAsync, must be awaited before QuicStream.DisposeAsync();

// Allocated when we receive a :status header.
private HttpResponseMessage? _response;
Expand Down Expand Up @@ -88,9 +90,25 @@ public void Dispose()
{
_disposed = true;
AbortStream();
// We aborted both sides, thus both task should unblock and should be finished before disposing the QuicStream.
WaitUnfinished(_sendRequestTask);
WaitUnfinished(_readResponseTask);
_stream.Dispose();
DisposeSyncHelper();
}

static void WaitUnfinished(Task? task)
{
if (task is not null && !task.IsCompleted)
{
try
{
task.GetAwaiter().GetResult();
}
catch // Exceptions from both tasks are logged via _connection.LogException() in case they're not awaited in SendAsync, so the exception can be ignored here.
{ }
}
}
}

private void RemoveFromConnectionIfDone()
Expand All @@ -107,9 +125,25 @@ public async ValueTask DisposeAsync()
{
_disposed = true;
AbortStream();
// We aborted both sides, thus both task should unblock and should be finished before disposing the QuicStream.
await AwaitUnfinished(_sendRequestTask).ConfigureAwait(false);
await AwaitUnfinished(_readResponseTask).ConfigureAwait(false);
await _stream.DisposeAsync().ConfigureAwait(false);
DisposeSyncHelper();
}

static async ValueTask AwaitUnfinished(Task? task)
{
if (task is not null && !task.IsCompleted)
{
try
{
await task.ConfigureAwait(false);
}
catch // Exceptions from both tasks are logged via _connection.LogException() in case they're not awaited in SendAsync, so the exception can be ignored here.
{ }
}
}
}

private void DisposeSyncHelper()
Expand Down Expand Up @@ -158,52 +192,51 @@ public async Task<HttpResponseMessage> SendAsync(CancellationToken cancellationT
await FlushSendBufferAsync(endStream: _request.Content == null, _requestBodyCancellationSource.Token).ConfigureAwait(false);
}

Task sendContentTask;
if (_request.Content != null)
{
sendContentTask = SendContentAsync(_request.Content!, _requestBodyCancellationSource.Token);
_sendRequestTask = SendContentAsync(_request.Content!, _requestBodyCancellationSource.Token);
}
else
{
sendContentTask = Task.CompletedTask;
_sendRequestTask = Task.CompletedTask;
}

// In parallel, send content and read response.
// Depending on Expect 100 Continue usage, one will depend on the other making progress.
Task readResponseTask = ReadResponseAsync(_requestBodyCancellationSource.Token);
_readResponseTask = ReadResponseAsync(_requestBodyCancellationSource.Token);
bool sendContentObserved = false;

// If we're not doing duplex, wait for content to finish sending here.
// If we are doing duplex and have the unlikely event that it completes here, observe the result.
// See Http2Connection.SendAsync for a full comment on this logic -- it is identical behavior.
if (sendContentTask.IsCompleted ||
if (_sendRequestTask.IsCompleted ||
_request.Content?.AllowDuplex != true ||
await Task.WhenAny(sendContentTask, readResponseTask).ConfigureAwait(false) == sendContentTask ||
sendContentTask.IsCompleted)
await Task.WhenAny(_sendRequestTask, _readResponseTask).ConfigureAwait(false) == _sendRequestTask ||
_sendRequestTask.IsCompleted)
{
try
{
await sendContentTask.ConfigureAwait(false);
await _sendRequestTask.ConfigureAwait(false);
sendContentObserved = true;
}
catch
{
// Exceptions will be bubbled up from sendContentTask here,
// which means the result of readResponseTask won't be observed directly:
// Exceptions will be bubbled up from _sendRequestTask here,
// which means the result of _readResponseTask won't be observed directly:
// Do a background await to log any exceptions.
_connection.LogExceptions(readResponseTask);
_connection.LogExceptions(_readResponseTask);
throw;
}
}
else
{
// Duplex is being used, so we can't wait for content to finish sending.
// Do a background await to log any exceptions.
_connection.LogExceptions(sendContentTask);
_connection.LogExceptions(_sendRequestTask);
}

// Wait for the response headers to be read.
await readResponseTask.ConfigureAwait(false);
await _readResponseTask.ConfigureAwait(false);

Debug.Assert(_response != null && _response.Content != null);
// Set our content stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1664,10 +1664,17 @@ public async Task ServerSendsTrailingHeaders_Success()

}

public enum CloseOutboundControlStream
{
BogusData,
Dispose,
Abort,
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task ServerClosesOutboundControlStream_ClientClosesConnection(bool graceful)
[InlineData(CloseOutboundControlStream.BogusData)]
[InlineData(CloseOutboundControlStream.Dispose)]
[InlineData(CloseOutboundControlStream.Abort)]
public async Task ServerClosesOutboundControlStream_ClientClosesConnection(CloseOutboundControlStream closeType)
{
using Http3LoopbackServer server = CreateHttp3LoopbackServer();

Expand All @@ -1680,13 +1687,31 @@ public async Task ServerClosesOutboundControlStream_ClientClosesConnection(bool
await using Http3LoopbackStream requestStream = await connection.AcceptRequestStreamAsync();

// abort the control stream
if (graceful)
if (closeType == CloseOutboundControlStream.BogusData)
{
await connection.OutboundControlStream.SendResponseBodyAsync(Array.Empty<byte>(), isFinal: true);
}
else
else if (closeType == CloseOutboundControlStream.Dispose)
{
connection.OutboundControlStream.Abort(Http3LoopbackConnection.H3_INTERNAL_ERROR);
await connection.OutboundControlStream.DisposeAsync();
}
else if (closeType == CloseOutboundControlStream.Abort)
{
int iterations = 5;
while (iterations-- > 0)
{
connection.OutboundControlStream.Abort(Http3LoopbackConnection.H3_INTERNAL_ERROR);
// This sends RESET_FRAME which might cause complete discard of any data including stream type, leading to client ignoring the stream.
// Attempt to establish the control stream again then.
if (await semaphore.WaitAsync(100))
{
// Client finished with the expected error.
return;
}
await connection.OutboundControlStream.DisposeAsync();
await connection.EstablishControlStreamAsync(Array.Empty<SettingsEntry>());
await Task.Delay(100);
}
}

// wait for client task before tearing down the requestStream and connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
Define this here because the SDK resets it
unconditionally in Microsoft.NETCoreSdk.BundledVersions.props.
-->
<NETCoreAppMaximumVersion>8.0</NETCoreAppMaximumVersion>
<NETCoreAppMaximumVersion>9.0</NETCoreAppMaximumVersion>
</PropertyGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Usage:
## ./build-local.ps1 [StressConfiguration] [LibrariesConfiguration]

$Version="8.0"
$Version="9.0"
$RepoRoot="$(git rev-parse --show-toplevel)"
$DailyDotnetRoot= "./.dotnet-daily"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
## Usage:
## ./build-local.sh [StressConfiguration] [LibrariesConfiguration]

version=8.0
version=9.0
repo_root=$(git rev-parse --show-toplevel)
daily_dotnet_root=./.dotnet-daily

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using Microsoft.Quic;

Expand Down Expand Up @@ -32,8 +33,8 @@ private void FreeNativeMemory()
{
QUIC_BUFFER* buffers = _buffers;
_buffers = null;
NativeMemory.Free(buffers);
_count = 0;
NativeMemory.Free(buffers);
}

private void Reserve(int count)
Expand All @@ -48,6 +49,10 @@ private void Reserve(int count)

private void SetBuffer(int index, ReadOnlyMemory<byte> buffer)
{
Debug.Assert(index < _count);
Debug.Assert(_buffers[index].Buffer is null);
Debug.Assert(_buffers[index].Length == 0);

_buffers[index].Buffer = (byte*)NativeMemory.Alloc((nuint)buffer.Length, (nuint)sizeof(byte));
_buffers[index].Length = (uint)buffer.Length;
buffer.Span.CopyTo(_buffers[index].Span);
Expand Down Expand Up @@ -93,8 +98,8 @@ public void Reset()
}
byte* buffer = _buffers[i].Buffer;
_buffers[i].Buffer = null;
NativeMemory.Free(buffer);
_buffers[i].Length = 0;
NativeMemory.Free(buffer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using Microsoft.Quic;
using static Microsoft.Quic.MsQuic;

namespace System.Net.Quic;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public override string ToString()
=> $"{{ {nameof(SEND_SHUTDOWN_COMPLETE.Graceful)} = {SEND_SHUTDOWN_COMPLETE.Graceful} }}",
QUIC_STREAM_EVENT_TYPE.SHUTDOWN_COMPLETE
=> $"{{ {nameof(SHUTDOWN_COMPLETE.ConnectionShutdown)} = {SHUTDOWN_COMPLETE.ConnectionShutdown}, {nameof(SHUTDOWN_COMPLETE.ConnectionShutdownByApp)} = {SHUTDOWN_COMPLETE.ConnectionShutdownByApp}, {nameof(SHUTDOWN_COMPLETE.ConnectionClosedRemotely)} = {SHUTDOWN_COMPLETE.ConnectionClosedRemotely}, {nameof(SHUTDOWN_COMPLETE.ConnectionErrorCode)} = {SHUTDOWN_COMPLETE.ConnectionErrorCode}, {nameof(SHUTDOWN_COMPLETE.ConnectionCloseStatus)} = {SHUTDOWN_COMPLETE.ConnectionCloseStatus} }}",
QUIC_STREAM_EVENT_TYPE.IDEAL_SEND_BUFFER_SIZE
=> $"{{ {nameof(IDEAL_SEND_BUFFER_SIZE.ByteCount)} = {IDEAL_SEND_BUFFER_SIZE.ByteCount} }}",
_ => string.Empty
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
// The .NET Foundation licenses this file to you under the MIT license.

#if DEBUG
using System.Collections.Generic;
using System.IO;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using Microsoft.Quic;
using static Microsoft.Quic.MsQuic;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public int CopyFrom(ReadOnlySpan<QUIC_BUFFER> quicBuffers, int totalLength, bool
}
}

public int CopyTo(Memory<byte> buffer, out bool isCompleted, out bool isEmpty)
public int CopyTo(Memory<byte> buffer, out bool completed, out bool empty)
{
lock (_syncRoot)
{
Expand All @@ -79,8 +79,8 @@ public int CopyTo(Memory<byte> buffer, out bool isCompleted, out bool isEmpty)
_buffer.Discard(copied);
}

isCompleted = _buffer.IsEmpty && _final;
isEmpty = _buffer.IsEmpty;
completed = _buffer.IsEmpty && _final;
empty = _buffer.IsEmpty;

return copied;
}
Expand Down
Loading

0 comments on commit 2984b87

Please sign in to comment.